NCBI C++ ToolKit
processing.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: processing.cpp 101212 2023-11-16 16:09:11Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Rafael Sadyrov
27  *
28  */
29 
30 #include <ncbi_pch.hpp>
31 
32 #include <numeric>
33 #include <unordered_set>
34 #include <unordered_map>
35 
40 #include <serial/enumvalues.hpp>
41 #include <serial/objcopy.hpp>
42 #include <serial/objistr.hpp>
43 #include <serial/objostr.hpp>
44 #include <util/checksum.hpp>
45 #include <util/compress/zlib.hpp>
46 #include <util/compress/stream.hpp>
47 
48 #include "performance.hpp"
49 #include "processing.hpp"
50 
52 
54 bool SParams::verbose = false;
55 
60 };
61 
63 {
66  {
69  }
70 
72  {
74  }
75 
77 
78 private:
80  void operator=(const SNewRequestContext&) = delete;
81 
83 };
84 
86 {
87  SInteractiveNewRequestStart(const string& request, CJson_ConstObject params_obj);
88 
89 private:
91  {
92  SExtra() : CDiagContext_Extra(GetDiagContext().PrintRequestStart()) {}
93 
94  void Print(const string& prefix, CJson_ConstValue json);
95  void Print(const string& prefix, CJson_ConstArray json);
96  void Print(const string& prefix, CJson_ConstObject json);
97  void Print(const string& prefix, CJson_ConstNode json);
98 
100  };
101 };
102 
103 string s_GetId(const CJson_Document& req_doc);
105 
107 {
108  switch (t) {
109  case eStart: return "Start";
110  case eSubmit: return "Submit";
111  case eReply: return "Reply";
112  case eDone: return "Done";
113  case eSend: return "Send";
114  case eReceive: return "Receive";
115  case eClose: return "Close";
116  case eRetry: return "Retry";
117  case eFail: return "Fail";
118 
119  case eLastType:
120  case eError: break;
121  }
122 
123  _TROUBLE;
124  return "ERROR";
125 }
126 
127 const char* s_StrStatus(EPSG_Status status)
128 {
129  switch (status) {
130  case EPSG_Status::eSuccess: return "Success";
131  case EPSG_Status::eInProgress: return "InProgress";
132  case EPSG_Status::eNotFound: return "NotFound";
133  case EPSG_Status::eCanceled: return "Canceled";
134  case EPSG_Status::eForbidden: return "Forbidden";
135  case EPSG_Status::eError: break;
136  }
137 
138  return "Error";
139 }
140 
142 {
143  stringstream ss;
144  doc.Write(ss, m_Flags);
145 
146  unique_lock<mutex> lock(m_Mutex);
147 
148  if (m_Pipe) {
149  cout << ss.rdbuf() << endl;
150  } else {
151  cout << m_Separator << '\n' << ss.rdbuf() << flush;
152  m_Separator = ',';
153  }
154 
155  return *this;
156 }
157 
159 {
160  // If not in pipe mode and printed some JSON
161  if (!m_Pipe && (m_Separator == ',')) {
162  cout << "\n]" << endl;
163  }
164 }
165 
166 template <class... TArgs>
167 CJsonResponse::CJsonResponse(EPSG_Status status, TArgs&&... args) :
168  m_JsonObj(SetObject())
169 {
170  try {
171  FillWithRequestID(status, std::forward<TArgs>(args)...);
172  }
173  catch (exception& e) {
174  CJson_Document new_doc;
175  new_doc.ResetObject()["data"].AssignCopy(*this);
176  AssignCopy(new_doc);
177  Set("code", eJsonRpc_ExceptionOnRead);
178  Set("message", e.what());
179  m_Error = true;
180  }
181 }
182 
184  CJsonResponse(id)
185 {
186  m_JsonObj[result.m_Error ? "error" : "result"].AssignCopy(result);
187 }
188 
189 CJsonResponse::CJsonResponse(const string& id, int code, const string& message) :
190  CJsonResponse(id)
191 {
192  CJson_Object error_obj = m_JsonObj.insert_object("error");
193  Set(error_obj["code"], code);
194  Set(error_obj["message"], message);
195 }
196 
197 CJsonResponse::CJsonResponse(const string& id) :
198  m_JsonObj(SetObject())
199 {
200  Set("jsonrpc", "2.0");
201 
202  auto id_value = m_JsonObj["id"].SetValue();
203 
204  if (id.empty()) {
205  id_value.SetNull();
206  } else {
207  id_value.SetString(id);
208  }
209 }
210 
211 template <class TItem>
213 {
214  Fill(status, std::move(item));
215 }
216 
217 template <class TItem, class... TArgs>
218 void CJsonResponse::FillWithRequestID(EPSG_Status status, TItem item, TArgs&&... args)
219 {
220  if (auto request_id = s_GetReply(item)->GetRequest()->template GetUserContext<string>()) {
221  Set("request_id", *request_id);
222  }
223 
224  Fill(status, std::move(item), std::forward<TArgs>(args)...);
225 }
226 
227 const char* s_GetItemName(CPSG_ReplyItem::EType type, bool trouble = true)
228 {
229  switch (type) {
230  case CPSG_ReplyItem::eBlobData: return "BlobData";
231  case CPSG_ReplyItem::eBlobInfo: return "BlobInfo";
232  case CPSG_ReplyItem::eSkippedBlob: return "SkippedBlob";
233  case CPSG_ReplyItem::eBioseqInfo: return "BioseqInfo";
234  case CPSG_ReplyItem::eNamedAnnotInfo: return "NamedAnnotInfo";
235  case CPSG_ReplyItem::eNamedAnnotStatus: return "NamedAnnotStatus";
236  case CPSG_ReplyItem::ePublicComment: return "PublicComment";
237  case CPSG_ReplyItem::eProcessor: return "Processor";
238  case CPSG_ReplyItem::eIpgInfo: return "IpgInfo";
239  case CPSG_ReplyItem::eEndOfReply: if (!trouble) return "Reply"; _TROUBLE;
240  }
241 
242  return "UnknownItem";
243 }
244 
245 CJsonResponse CJsonResponse::NewItem(const shared_ptr<CPSG_ReplyItem>& reply_item)
246 {
247  CJsonResponse rv;
248  rv.Set("reply", "NewItem");
249  rv.Set("item", s_GetItemName(reply_item->GetType()));
250  rv.Set("status", s_StrStatus(reply_item->GetStatus(CDeadline::eNoWait)));
251  return rv;
252 }
253 
254 void CJsonResponse::Fill(EPSG_Status reply_item_status, shared_ptr<CPSG_ReplyItem> reply_item)
255 {
256  auto reply_item_type = reply_item->GetType();
257 
258  if (sm_SetReplyType) {
259  Set("reply", s_GetItemName(reply_item_type));
260  }
261 
262  if (SParams::verbose) {
263  Set("processor_id", reply_item->GetProcessorId());
264  }
265 
266  if (reply_item_status != EPSG_Status::eSuccess) {
267  return Fill(reply_item, reply_item_status);
268  }
269 
270  switch (reply_item_type) {
272  return Fill(static_pointer_cast<CPSG_BlobData>(reply_item));
273 
275  return Fill(static_pointer_cast<CPSG_BlobInfo>(reply_item));
276 
278  return Fill(static_pointer_cast<CPSG_SkippedBlob>(reply_item));
279 
281  return Fill(static_pointer_cast<CPSG_BioseqInfo>(reply_item));
282 
284  return Fill(static_pointer_cast<CPSG_NamedAnnotInfo>(reply_item));
285 
287  return Fill(static_pointer_cast<CPSG_NamedAnnotStatus>(reply_item));
288 
290  return Fill(static_pointer_cast<CPSG_PublicComment>(reply_item));
291 
293  return Fill(static_pointer_cast<CPSG_Processor>(reply_item));
294 
296  return Fill(static_pointer_cast<CPSG_IpgInfo>(reply_item));
297 
299  _TROUBLE;
300  return;
301  }
302 
303  throw logic_error("Received unknown item: " + to_string(reply_item_type));
304 }
305 
306 auto s_IsRawRequest(shared_ptr<const CPSG_Request>& request)
307 {
308  return (request->GetType() == CPSG_Request::eBlob) && dynamic_pointer_cast<const CRawRequest>(request);
309 }
310 
311 auto s_IsRawResponse(const CPSG_BlobId* blob_id)
312 {
313  return blob_id && (blob_id->GetLastModified() == CPSG_BlobId::TLastModified(numeric_limits<Int8>::min()));
314 }
315 
316 void CJsonResponse::Fill(shared_ptr<CPSG_BlobData> blob_data)
317 {
318  if (auto request = blob_data->GetReply()->GetRequest(); s_IsRawRequest(request)) {
319  if (auto blob_id = blob_data->GetId<CPSG_BlobId>(); s_IsRawResponse(blob_id)) {
320  if (CJson_Document json_doc; json_doc.Read(blob_data->GetStream())) {
321  Set("reply", blob_id->GetId());
322 
323  for (const auto& p : json_doc.GetObject()) {
324  m_JsonObj.insert(p.name, p.value);
325  }
326 
327  return;
328  }
329  }
330  }
331 
332  Set("id", blob_data);
333  ostringstream os;
334  os << blob_data->GetStream().rdbuf();
335  auto data = os.str();
336 
337  if (const auto data_size = data.size(); data_size <= sm_DataLimit) {
338  Set("data", NStr::Base64Encode(data));
339  } else {
340  Set("length", static_cast<Uint8>(data_size)); // macOS requires static_cast, Set() would be ambiguous otherwise
341  CHash hash;
342  hash.Calculate(data.data(), data_size);
343  Set("hash", hash.GetResultHex());
344  data.erase(sm_PreviewSize);
345  Set("preview", NStr::Base64Encode(data));
346  }
347 }
348 
349 void CJsonResponse::Fill(shared_ptr<CPSG_BlobInfo> blob_info)
350 {
351  Set("id", blob_info);
352  Set("compression", blob_info->GetCompression());
353  Set("format", blob_info->GetFormat());
354  Set("storage_size", blob_info->GetStorageSize());
355  Set("size", blob_info->GetSize());
356  Set("is_dead", blob_info->IsDead());
357  Set("is_suppressed", blob_info->IsSuppressed());
358  Set("is_withdrawn", blob_info->IsWithdrawn());
359  Set("hup_release_date", blob_info->GetHupReleaseDate().AsString());
360  Set("owner", blob_info->GetOwner());
361  Set("original_load_date", blob_info->GetOriginalLoadDate().AsString());
362  Set("class", objects::CBioseq_set::ENUM_METHOD_NAME(EClass)()->FindName(blob_info->GetClass(), true));
363  Set("division", blob_info->GetDivision());
364  Set("username", blob_info->GetUsername());
365  Set("id2_info", blob_info->GetId2Info());
366  Set("n_chunks", blob_info->GetNChunks());
367 }
368 
370 {
371  switch (reason) {
372  case CPSG_SkippedBlob::eExcluded: return "Excluded";
373  case CPSG_SkippedBlob::eInProgress: return "InProgress";
374  case CPSG_SkippedBlob::eSent: return "Sent";
375  case CPSG_SkippedBlob::eUnknown: return "Unknown";
376  };
377 
378  _TROUBLE;
379  return "";
380 }
381 
382 void CJsonResponse::Fill(shared_ptr<CPSG_SkippedBlob> skipped_blob)
383 {
384  Set("id", skipped_blob);
385  Set("reason", s_ReasonToString(skipped_blob->GetReason()));
386 
387  const auto& sent_seconds_ago = skipped_blob->GetSentSecondsAgo();
388 
389  if (!sent_seconds_ago.IsNull()) {
390  Set("sent_seconds_ago", sent_seconds_ago.GetValue());
391  }
392 
393  const auto& time_until_resend = skipped_blob->GetTimeUntilResend();
394 
395  if (!time_until_resend.IsNull()) {
396  Set("time_until_resend", time_until_resend.GetValue());
397  }
398 }
399 
400 void CJsonResponse::Fill(shared_ptr<CPSG_BioseqInfo> bioseq_info)
401 {
402  const auto included_info = bioseq_info->IncludedInfo();
403 
404  if (included_info & CPSG_Request_Resolve::fCanonicalId) Set("canonical_id", bioseq_info->GetCanonicalId());
405  if (included_info & CPSG_Request_Resolve::fOtherIds) Set("other_ids", bioseq_info->GetOtherIds());
406  if (included_info & CPSG_Request_Resolve::fMoleculeType) Set("molecule_type", objects::CSeq_inst::ENUM_METHOD_NAME(EMol)()->FindName(bioseq_info->GetMoleculeType(), true));
407  if (included_info & CPSG_Request_Resolve::fLength) Set("length", bioseq_info->GetLength());
408  if (included_info & CPSG_Request_Resolve::fChainState) Set("seq_state", bioseq_info->GetChainState());
409  if (included_info & CPSG_Request_Resolve::fState) Set("state", bioseq_info->GetState());
410  if (included_info & CPSG_Request_Resolve::fBlobId) Set("blob_id", bioseq_info->GetBlobId());
411  if (included_info & CPSG_Request_Resolve::fTaxId) Set("tax_id", TAX_ID_TO(Int8, bioseq_info->GetTaxId()));
412  if (included_info & CPSG_Request_Resolve::fHash) Set("hash", bioseq_info->GetHash());
413  if (included_info & CPSG_Request_Resolve::fDateChanged) Set("date_changed", bioseq_info->GetDateChanged().AsString());
414  if (included_info & CPSG_Request_Resolve::fGi) Set("gi", GI_TO(Int8, bioseq_info->GetGi()));
415 }
416 
417 void CJsonResponse::Fill(shared_ptr<CPSG_NamedAnnotInfo> named_annot_info)
418 {
419  Set("name", named_annot_info->GetName());
420  Set("blob_id", named_annot_info->GetBlobId());
421  Set("id2_annot_info", named_annot_info->GetId2AnnotInfo());
422 }
423 
424 void CJsonResponse::Fill(shared_ptr<CPSG_NamedAnnotStatus> named_annot_status)
425 {
426  auto ar = m_JsonObj["statuses"].ResetArray();
427 
428  for (const auto& status : named_annot_status->GetId2AnnotStatusList()) {
429  ar.push_back();
430  auto obj = ar.back().ResetObject();
431  Set(obj["name"], status.first);
432  Set(obj["status"], s_StrStatus(status.second));
433  }
434 }
435 
436 void CJsonResponse::Fill(shared_ptr<CPSG_PublicComment> public_comment)
437 {
438  Set("id", public_comment);
439  Set("text", public_comment->GetText());
440 }
441 
443 {
444  switch (progress_status) {
445  case CPSG_Processor::eStart: return "start";
446  case CPSG_Processor::eDone: return "done";
447  case CPSG_Processor::eNotFound: return "not_found";
448  case CPSG_Processor::eCanceled: return "canceled";
449  case CPSG_Processor::eTimeout: return "timeout";
450  case CPSG_Processor::eError: return "error";
451  case CPSG_Processor::eUnauthorized: return "unauthorized";
452  case CPSG_Processor::eUnknown: return "unknown";
453  }
454 
455  _TROUBLE;
456  return "";
457 }
458 
459 void CJsonResponse::Fill(shared_ptr<CPSG_Processor> processor)
460 {
461  const auto progress_status = processor->GetProgressStatus();
462 
463  // Progress status does not make sense without processor ID,
464  // the latter is reported in the verbose mode elsewhere
465  if (!SParams::verbose) {
466  Set("processor_id", processor->GetProcessorId());
467  }
468 
469  Set("progress_status", s_ProgressStatusToString(progress_status));
470 }
471 
472 void CJsonResponse::Fill(shared_ptr<CPSG_IpgInfo> ipg_info)
473 {
474  Set("protein", ipg_info->GetProtein());
475  Set("ipg", ipg_info->GetIpg());
476  Set("nucleotide", ipg_info->GetNucleotide());
477  Set("tax_id", ipg_info->GetTaxId());
478  Set("gb_state", ipg_info->GetGbState());
479 }
480 
481 template <class TItem>
482 void CJsonResponse::Fill(EPSG_Status status, TItem item, string first_message)
483 {
484  Set("status", s_StrStatus(status));
485 
486  for (auto message = std::move(first_message); !message.empty(); message = item->GetNextMessage()) {
487  if (auto errors = m_JsonObj["errors"]; errors.IsNull()) {
488  errors.ResetArray().push_back(message);
489  } else {
490  errors.SetArray().push_back(message);
491  }
492  }
493 }
494 
495 void CJsonResponse::Set(CJson_Node node, const CPSG_BioId& bio_id)
496 {
497  auto obj = node.ResetObject();
498  Set(obj["id"], bio_id.GetId());
499 
500  const auto& type = bio_id.GetType();
501 
503  Set(obj["type"], type);
504  }
505 }
506 
507 void CJsonResponse::Set(CJson_Node node, const vector<CPSG_BioId>& bio_ids)
508 {
509  auto ar = node.ResetArray();
510 
511  for (const auto& bio_id : bio_ids) {
512  ar.push_back();
513  Set(ar.back(), bio_id);
514  }
515 }
516 
517 void CJsonResponse::Set(CJson_Node node, const CPSG_BlobId& blob_id)
518 {
519  auto obj = node.ResetObject();
520  Set(obj["id"], blob_id.GetId());
521 
522  const auto& last_modified = blob_id.GetLastModified();
523 
524  if (!last_modified.IsNull()) {
525  Set(obj["last_modified"], last_modified.GetValue());
526  }
527 }
528 
529 void CJsonResponse::Set(CJson_Node node, const CPSG_ChunkId& chunk_id)
530 {
531  auto obj = node.ResetObject();
532  Set(obj["id2_chunk"], chunk_id.GetId2Chunk());
533  Set(obj["id2_info"], chunk_id.GetId2Info());
534 }
535 
536 string SInteractiveParams::GetService(string service, bool one_server)
537 {
538  if (one_server) {
539  auto servers = CServiceDiscovery(service.empty() ? TPSG_Service::GetDefault() : service)();
540 
541  if (!servers.empty()) {
542  sort(servers.begin(), servers.end(), [](const auto& l, const auto& r) { return l.second < r.second; });
543  return servers.back().first.AsString();
544  }
545  }
546 
547  return service;
548 }
549 
550 void SMetrics::OutputItems(ostream& os) const
551 {
552  for (const auto& item : m_Items) {
553  os << '\t' << s_GetItemName(item.first, false) << '=' << s_StrStatus(item.second);
554  }
555 }
556 
558 {
560 
561  void ItemComplete(EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item);
562  void ReplyComplete(EPSG_Status status, const shared_ptr<CPSG_Reply>& reply);
563 
564  operator int() const { return int(m_Status); }
565 
566 private:
567  void Process(shared_ptr<CPSG_BlobInfo> blob_info);
568  void Process(shared_ptr<CPSG_BlobData> blob_data);
569  void Process(shared_ptr<CPSG_NamedAnnotInfo> named_annot_info);
570  void Process(shared_ptr<CPSG_BlobInfo> blob_info, shared_ptr<CPSG_BlobData> blob_data);
571 
572  template <class TItem>
573  bool ReportErrors(EPSG_Status status, TItem item, const char* prefix);
574 
576  unordered_map<string, pair<shared_ptr<CPSG_BlobInfo>, shared_ptr<CPSG_BlobData>>> m_Data;
578 };
579 
580 template <class TItem>
581 bool SDataOnlyCopy::ReportErrors(EPSG_Status status, TItem item, const char* prefix)
582 {
583  if (status == EPSG_Status::eSuccess) {
584  return false;
585  }
586 
587  if (m_Status == EPSG_Status::eSuccess) m_Status = status;
588 
589  stringstream ss;
590 
591  if (m_Params.messages_only) {
592  auto delim = "";
593 
594  for (;;) {
595  auto message = item->GetNextMessage();
596 
597  if (message.empty()) break;
598 
599  ss << delim << message;
600  delim = ", ";
601  }
602  } else {
603  ss << prefix << s_StrStatus(status);
604 
605  for (;;) {
606  auto message = item->GetNextMessage();
607 
608  if (message.empty()) break;
609 
610  ss << "\n\t" << message;
611  }
612 
613  ss << '\n';
614  }
615 
616  cerr << ss.rdbuf();
617  return true;
618 }
619 
620 void SDataOnlyCopy::ItemComplete(EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item)
621 {
622  if (ReportErrors(status, item, "Item error: ")) {
623  return;
624 
625  } else if (item->GetType() == CPSG_ReplyItem::eBlobInfo) {
626  Process(static_pointer_cast<CPSG_BlobInfo>(item));
627 
628  } else if (item->GetType() == CPSG_ReplyItem::eBlobData) {
629  Process(static_pointer_cast<CPSG_BlobData>(item));
630 
631  } else if (item->GetType() == CPSG_ReplyItem::eNamedAnnotInfo) {
632  Process(static_pointer_cast<CPSG_NamedAnnotInfo>(item));
633  }
634 }
635 
636 void SDataOnlyCopy::ReplyComplete(EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
637 {
638  ReportErrors(status, reply, "Reply error: ");
639 }
640 
642 {
643  if (format == "asn1-text") return eSerial_AsnText;
644  if (format == "xml") return eSerial_Xml;
645  if (format == "json") return eSerial_Json;
646 
647  return eSerial_AsnBinary;
648 }
649 
650 TTypeInfo s_GetInputType(const shared_ptr<CPSG_BlobData>& blob_data)
651 {
652  using namespace objects;
653 
654  if (auto chunk_id = blob_data->GetId<CPSG_ChunkId>()) {
655  return chunk_id->GetId2Chunk() == 999999999 ? CID2S_Split_Info::GetTypeInfo() : CID2S_Chunk::GetTypeInfo();
656  }
657 
658  return CSeq_entry::GetTypeInfo();
659 }
660 
661 void SDataOnlyCopy::Process(shared_ptr<CPSG_BlobInfo> blob_info)
662 {
663  auto& p = m_Data[blob_info->GetId()->Repr()];
664 
665  if (p.second) {
666  Process(std::move(blob_info), std::move(p.second));
667  } else {
668  p.first = std::move(blob_info);
669  }
670 }
671 
672 void SDataOnlyCopy::Process(shared_ptr<CPSG_BlobData> blob_data)
673 {
674  auto& p = m_Data[blob_data->GetId()->Repr()];
675 
676  if (p.first) {
677  Process(std::move(p.first), std::move(blob_data));
678  } else {
679  p.second = std::move(blob_data);
680  }
681 }
682 
683 void SDataOnlyCopy::Process(shared_ptr<CPSG_BlobInfo> blob_info, shared_ptr<CPSG_BlobData> blob_data)
684 {
685  auto& is = blob_data->GetStream();
686 
688  cout << is.rdbuf();
689  return;
690  }
691 
692  auto input_format = s_GetInputFormat(blob_info->GetFormat());
693  unique_ptr<CObjectIStream> in;
694 
695  if (blob_info->GetCompression().find("zip") == string::npos) {
696  in.reset(CObjectIStream::Open(input_format, is));
697  } else {
698  unique_ptr<CZipStreamDecompressor> zip(new CZipStreamDecompressor);
699  unique_ptr<CCompressionIStream> compressed_is(new CCompressionIStream(is, zip.release(), CCompressionIStream::fOwnProcessor));
700  in.reset(CObjectIStream::Open(input_format, *compressed_is.release(), eTakeOwnership));
701  }
702 
703  _ASSERT(in);
704  in->UseMemoryPool();
705 
706  stringstream ss;
707  unique_ptr<CObjectOStream> out(CObjectOStream::Open(m_Params.output_format, ss));
708  CObjectStreamCopier copier(*in, *out);
709 
710  try {
711  copier.Copy(s_GetInputType(blob_data));
712  }
713  catch (CException& ex) {
714  cerr << "Failed to process blob '" << blob_data->GetId()->Repr() << "': " << ex.ReportThis() << endl;
715  return;
716  }
717 
718  cout << ss.rdbuf();
719 }
720 
721 void SDataOnlyCopy::Process(shared_ptr<CPSG_NamedAnnotInfo> named_annot_info)
722 {
724  cout << NStr::Base64Decode(named_annot_info->GetId2AnnotInfo());
725  return;
726  }
727 
728  for (const auto& info : named_annot_info->GetId2AnnotInfoList() ) {
730  }
731 }
732 
733 template <class... TArgs>
735 {
736  static void ItemComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item)
737  {
738  json_out << CJsonResponse(status, item);
739  }
740 
741  static void ReplyComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
742  {
743  json_out << CJsonResponse(status, reply);
744  }
745 };
746 
747 template <class... TArgs>
748 struct SNonVerbose : SNonVerboseBase<TArgs...>
749 {
750  using SNonVerboseBase<TArgs...>::SNonVerboseBase;
751 
752  void ItemComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item);
753  void ReplyComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply);
754 
755 private:
756  using TBase = SNonVerboseBase<TArgs...>;
757 
759 };
760 
761 template <class... TArgs>
762 void SNonVerbose<TArgs...>::ItemComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item)
763 {
764  const auto type = item->GetType();
765 
766  if (status == EPSG_Status::eNotFound) {
767  m_Items.try_emplace(type, item);
768  } else {
769  TBase::ItemComplete(json_out, status, item);
770 
771  if (auto [it, result] = m_Items.try_emplace(type, nullptr); !result && it->second) {
772  it->second.reset();
773  }
774  }
775 }
776 
777 template <class... TArgs>
778 void SNonVerbose<TArgs...>::ReplyComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
779 {
780  for (const auto& p : m_Items) {
781  if (auto& item = p.second) {
782  TBase::ItemComplete(json_out, EPSG_Status::eNotFound, item);
783  }
784  }
785 
786  if (status != EPSG_Status::eSuccess) {
787  TBase::ReplyComplete(json_out, status, reply);
788  }
789 }
790 
791 template <class... TArgs>
792 void s_ItemComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item)
793 {
794  json_out << CJsonResponse(status, item);
795 }
796 
797 template <class... TArgs>
798 void s_ReplyComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
799 {
800  json_out << CJsonResponse(status, reply);
801 }
802 
803 template <>
804 void s_ReplyComplete<SOneRequestParams>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
805 {
806  if (status != EPSG_Status::eSuccess) {
807  s_ReplyComplete<>(json_out, status, reply);
808  }
809 }
810 
811 template <class... TArgs>
812 void s_NewItem(SJsonOut&, const shared_ptr<CPSG_ReplyItem>&)
813 {
814 }
815 
816 template <>
817 void s_NewItem<SOneRequestParams>(SJsonOut& json_out, const shared_ptr<CPSG_ReplyItem>& reply_item)
818 {
819  json_out << CJsonResponse::NewItem(reply_item);
820 }
821 
822 int CProcessing::OneRequest(const SOneRequestParams& params, shared_ptr<CPSG_Request> request)
823 {
824  SDataOnlyCopy data_only_copy(params.data_only);
825  CLogLatencyReport latency_report{
826  "/objtools/pubseq_gateway/client/",
827  R"(\S+: (\S+:[0-9]+)/\S+?\S+&client_id=\S+)",
828  R"(\S+: Closed with status \S+)",
829  R"(\S+: \S+&item_type=reply&\S+&exec_time=([0-9]+)\\n)"
830  };
831 
832  if (params.latency.which) {
833  latency_report.Start(params.latency.which);
834  latency_report.SetDebug(params.latency.debug);
835  TPSG_DebugPrintout::SetDefault(TPSG_DebugPrintout::TValue::eSome);
836  }
837 
838  CPSG_EventLoop queue;
839  SJsonOut json_out;
840  SNonVerbose<> non_verbose;
841 
842  using namespace placeholders;
843 
844  if (params.data_only.enabled) {
845  auto item_complete = bind(&SDataOnlyCopy::ItemComplete, &data_only_copy, _1, _2);
846  auto reply_complete = bind(&SDataOnlyCopy::ReplyComplete, &data_only_copy, _1, _2);
847  queue = CPSG_EventLoop(params.service, item_complete, reply_complete);
848  } else if (!params.verbose) {
849  auto item_complete = bind(&SNonVerbose<>::ItemComplete, &non_verbose, ref(json_out), _1, _2);
850  auto reply_complete = bind(&SNonVerbose<>::ReplyComplete, &non_verbose, ref(json_out), _1, _2);
851  queue = CPSG_EventLoop(params.service, item_complete, reply_complete);
852  } else {
853  auto item_complete = bind(&s_ItemComplete<SOneRequestParams>, ref(json_out), _1, _2);
854  auto reply_complete = bind(&s_ReplyComplete<SOneRequestParams>, ref(json_out), _1, _2);
855  auto new_item = bind(&s_NewItem<SOneRequestParams>, ref(json_out), _1);
856  queue = CPSG_EventLoop(params.service, item_complete, reply_complete, new_item);
857  }
858 
860  _VERIFY(queue.SendRequest(request, CDeadline::eInfinite));
861  queue.Stop();
863  return data_only_copy;
864 }
865 
866 template <>
868 {
870  SNonVerboseBase(string id) : string(std::move(id)) {}
871 
872  void ItemComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item);
873  void ReplyComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply);
874 };
875 
876 template <class TParams>
878 {
880 }
881 
882 template <>
884 {
885  string id;
886 
887  while (m_InputQueue.Pop(id)) {
888  _ASSERT(!id.empty()); // ReadLine makes sure it's not empty
889  auto bio_id = CPSG_BioId(id, m_Params.type);
890  auto user_context = make_shared<SNonVerbose<SBatchResolveParams>>(std::move(id));
891  auto request = make_shared<CPSG_Request_Resolve>(std::move(bio_id), m_Params.bio_id_resolution, std::move(user_context));
892 
893  request->IncludeInfo(m_Params.include_info);
894  request->SetAccSubstitution(m_Params.acc_substitution);
895 
896  _VERIFY(output.SendRequest(std::move(request), CDeadline::eInfinite));
897  }
898 
899  output.Stop();
900 }
901 
902 using verbose = true_type;
903 using no_verbose = false_type;
904 
905 template <>
906 void s_ItemComplete<no_verbose>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item)
907 {
908  auto context = item->GetReply()->GetRequest()->GetUserContext<SNonVerbose<SBatchResolveParams>>();
909  _ASSERT(context);
910  context->ItemComplete(json_out, status, item);
911 }
912 
913 void SNonVerboseBase<SBatchResolveParams>::ItemComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item)
914 {
915  reported.emplace(status);
916  s_ItemComplete<verbose>(json_out, status, item);
917 }
918 
919 template <>
920 void s_ReplyComplete<no_verbose>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
921 {
922  auto context = reply->GetRequest()->GetUserContext<SNonVerbose<SBatchResolveParams>>();
923  _ASSERT(context);
924  context->ReplyComplete(json_out, status, reply);
925 }
926 
927 void SNonVerboseBase<SBatchResolveParams>::ReplyComplete(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
928 {
929  if (auto first_message = reply->GetNextMessage(); !first_message.empty() || (reported.find(status) == reported.end())) {
930  CJsonResponse result_doc(status, reply, first_message);
931  json_out << result_doc;
932  }
933 }
934 
935 template <class TParams>
937 {
938  // MS VS does not like the ternary operator here anymore
939  if (SParams::verbose) {
940  return &s_ItemComplete<verbose>;
941  } else {
943  }
944 }
945 
946 template <class TParams>
948 {
949  // MS VS does not like the ternary operator here anymore
950  if (SParams::verbose) {
951  return &s_ReplyComplete<verbose>;
952  } else {
954  }
955 }
956 
957 template <class TParams>
959 {
960  return &s_NewItem<TParams>;
961 }
962 
963 template <>
965 {
966  string line, protein, n;
967 
968  while (m_InputQueue.Pop(line)) {
969  _ASSERT(!line.empty()); // ReadLine makes sure it's not empty
970 
971  auto nucleotide = NStr::SplitInTwo(line, ",", protein, n) ? CPSG_Request_IpgResolve::TNucleotide(n) : null;
972  auto user_context = make_shared<SNonVerbose<SBatchResolveParams>>(std::move(line));
973  auto request = make_shared<CPSG_Request_IpgResolve>(std::move(protein), 0, std::move(nucleotide), std::move(user_context));
974 
975  _VERIFY(output.SendRequest(std::move(request), CDeadline::eInfinite));
976  }
977 
978  output.Stop();
979 }
980 
981 template <>
983 {
986 }
987 
988 template <>
990 {
991  CJson_Document json_schema_doc(CProcessing::RequestSchema());
992  CJson_Schema json_schema(json_schema_doc);
993  string line;
994 
995  while (m_InputQueue.Pop(line)) {
996  _ASSERT(!line.empty()); // ReadLine makes sure it's not empty
997 
998  CJson_Document json_doc;
999 
1000  if (!json_doc.ParseString(line)) {
1001  json_out << CJsonResponse(s_GetId(json_doc), eJsonRpc_ParseError, json_doc.GetReadError());
1002  } else if (!json_schema.Validate(json_doc)) {
1003  json_out << CJsonResponse(s_GetId(json_doc), eJsonRpc_InvalidRequest, json_schema.GetValidationError());
1004  } else {
1005  if (m_Params.echo) json_out << json_doc;
1006 
1007  CJson_ConstObject json_obj(json_doc.GetObject());
1008  auto method = json_obj["method"].GetValue().GetString();
1009  auto id = json_obj["id"].GetValue().GetString();
1010  auto params_obj = json_obj["params"].GetObject();
1011  auto user_context = make_shared<string>(id);
1012  auto request_context = m_Params.testing ? null : SInteractiveNewRequestStart(method, params_obj).Get();
1013 
1014  if (auto request = SRequestBuilder::Build(method, params_obj, std::move(user_context), std::move(request_context))) {
1015  _VERIFY(output.SendRequest(std::move(request), CDeadline::eInfinite));
1016  }
1017  }
1018  }
1019 
1020  output.Stop();
1021 }
1022 
1023 using testing = true_type;
1024 using no_testing = false_type;
1025 using server_mode = true_type;
1026 using no_server_mode = false_type;
1027 
1028 template <>
1029 void s_ReplyComplete<SInteractiveParams, testing, server_mode>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
1030 {
1031  const auto request = reply->GetRequest();
1032  const auto& request_id = *request->GetUserContext<string>();
1033 
1034  CJsonResponse result_doc(status, reply, CJsonResponse::eDoNotAddRequestID);
1035  json_out << CJsonResponse(request_id, result_doc);
1036 }
1037 
1038 template <>
1039 void s_ReplyComplete<SInteractiveParams, testing, no_server_mode>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
1040 {
1041  if (status != EPSG_Status::eSuccess) {
1043  }
1044 }
1045 
1046 template <>
1047 void s_ReplyComplete<SInteractiveParams, no_testing, server_mode>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
1048 {
1049  const auto request = reply->GetRequest();
1050  CRequestContextGuard_Base guard(request->GetRequestContext()->Clone());
1051  guard.SetStatus(s_PsgStatusToRequestStatus(status));
1052 
1054 }
1055 
1056 template <>
1057 void s_ReplyComplete<SInteractiveParams, no_testing, no_server_mode>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_Reply>& reply)
1058 {
1059  const auto request = reply->GetRequest();
1060  CRequestContextGuard_Base guard(request->GetRequestContext()->Clone());
1061  guard.SetStatus(s_PsgStatusToRequestStatus(status));
1062 
1064 }
1065 
1066 template <>
1067 void s_ItemComplete<SInteractiveParams>(SJsonOut& json_out, EPSG_Status status, const shared_ptr<CPSG_ReplyItem>& item)
1068 {
1069  const auto request = item->GetReply()->GetRequest();
1070  const auto& request_id = *request->GetUserContext<string>();
1071 
1072  CJsonResponse result_doc(status, item, CJsonResponse::eDoNotAddRequestID);
1073  json_out << CJsonResponse(request_id, result_doc);
1074 }
1075 
1076 template <>
1078 {
1080 }
1081 
1082 template <>
1084 {
1085  if (m_Params.testing) {
1086  // MS VS does not like the ternary operator here anymore
1087  if (m_Params.server) {
1089  } else {
1091  }
1092  } else {
1093  // MS VS does not like the ternary operator here anymore
1094  if (m_Params.server) {
1096  } else {
1098  }
1099  }
1100 }
1101 
1102 template <>
1103 void s_NewItem<SInteractiveParams, verbose>(SJsonOut& json_out, const shared_ptr<CPSG_ReplyItem>& item)
1104 {
1105  const auto request = item->GetReply()->GetRequest();
1106  const auto& request_id = *request->GetUserContext<string>();
1107 
1108  auto result_doc = CJsonResponse::NewItem(item);
1109  json_out << CJsonResponse(request_id, result_doc);
1110 }
1111 
1112 template <>
1114 {
1115  // MS VS does not like the ternary operator here anymore
1116  if (SParams::verbose) {
1118  } else {
1119  return &s_NewItem<SInteractiveParams, no_verbose>;
1120  }
1121 }
1122 
1123 template <class TParams>
1125  json_out(params.pipe, params.server),
1126  m_Params(params)
1127 {
1128  Init(params);
1129 }
1130 
1131 template <class TParams>
1133  m_Impl(params)
1134 {
1135  using namespace placeholders;
1136  auto item_complete = bind(m_Impl.GetItemComplete(), ref(m_Impl.json_out), _1, _2);
1137  auto reply_complete = bind(m_Impl.GetReplyComplete(), ref(m_Impl.json_out), _1, _2);
1138  auto new_item = bind(m_Impl.GetNewItem(), ref(m_Impl.json_out), _1);
1139 
1140  for (int n = params.worker_threads; n > 0; --n) {
1141  m_PsgQueues.emplace_back(params.service, item_complete, reply_complete, new_item);
1142  auto& queue = m_PsgQueues.back();
1143  queue.SetRequestFlags(params.request_flags);
1144  queue.SetUserArgs(params.user_args);
1145  m_Threads.emplace_back(&CPSG_EventLoop::Run, ref(queue), CDeadline::eInfinite);
1146  m_Threads.emplace_back(&SImpl::Submitter, &m_Impl, ref(queue));
1147  }
1148 }
1149 
1150 shared_ptr<CPSG_Reply> s_GetReply(shared_ptr<CPSG_ReplyItem>& item)
1151 {
1152  return item->GetReply();
1153 }
1154 
1155 shared_ptr<CPSG_Reply> s_GetReply(shared_ptr<CPSG_Reply>& reply)
1156 {
1157  return reply;
1158 }
1159 
1161 {
1162  switch (psg_status) {
1168  case EPSG_Status::eInProgress: _TROUBLE; break;
1169  }
1170 
1172 }
1173 
1174 string s_GetId(const CJson_Document& req_doc)
1175 {
1176  string id;
1177 
1178  if (req_doc.IsObject()) {
1179  auto req_obj = req_doc.GetObject();
1180 
1181  if (req_obj.has("id")) {
1182  auto id_node = req_obj["id"];
1183 
1184  if (id_node.IsValue()) {
1185  auto id_value = id_node.GetValue();
1186 
1187  if (id_value.IsString()) {
1188  id = id_value.GetString();
1189  }
1190  }
1191  }
1192  }
1193 
1194  return id;
1195 }
1196 
1197 template <class TCreateContext>
1198 vector<shared_ptr<CPSG_Request>> CProcessing::ReadCommands(TCreateContext create_context, size_t report_progress_after)
1199 {
1200  static CJson_Schema json_schema(RequestSchema());
1201  string line;
1202  vector<shared_ptr<CPSG_Request>> requests;
1203  unordered_set<string> ids;
1204 
1205  // Read requests from cin
1206  while (ReadLine(line)) {
1207  CJson_Document json_doc;
1208 
1209  if (!json_doc.ParseString(line)) {
1210  cerr << "Error in request '" << line << "': " << json_doc.GetReadError() << endl;
1211  return {};
1212  }
1213 
1214  auto id = s_GetId(json_doc);
1215 
1216  if (id.empty()) {
1217  cerr << "Error in request '" << line << "': no id or id is empty" << endl;
1218  return {};
1219  } else if (!json_schema.Validate(json_doc)) {
1220  cerr << "Error in request '" << id << "': " << json_schema.GetValidationError() << endl;
1221  return {};
1222  } else if (!ids.insert(id).second) {
1223  cerr << "Error in request '" << id << "': duplicate ID" << endl;
1224  return {};
1225  } else {
1226  CJson_ConstObject json_obj(json_doc.GetObject());
1227  auto method = json_obj["method"].GetValue().GetString();
1228  auto params_obj = json_obj["params"].GetObject();
1229  auto user_context = create_context(id, params_obj);
1230 
1231  if (!user_context) return {};
1232 
1233  if (auto request = SRequestBuilder::Build(method, params_obj, std::move(user_context))) {
1234  requests.emplace_back(std::move(request));
1235  if (report_progress_after && (requests.size() % report_progress_after == 0)) cerr << '.';
1236  }
1237  }
1238  }
1239 
1240  return requests;
1241 }
1242 
1244 {
1245  if (params.delay < 0.0) {
1246  cerr << "DELAY must be non-negative" << endl;
1247  return -1;
1248  }
1249 
1250  const size_t kReportProgressAfter = 2000;
1251 
1252  using TReplyStorage = deque<shared_ptr<CPSG_Reply>>;
1253 
1254  if (SParams::verbose) cerr << "Preparing requests: ";
1255  auto requests = ReadCommands([](string id, CJson_ConstNode&){ return make_shared<SMetrics>(std::move(id)); }, SParams::verbose ? kReportProgressAfter : 0);
1256 
1257  if (requests.empty()) return -1;
1258 
1259  atomic_size_t start(params.user_threads);
1260  atomic_int to_submit(static_cast<int>(requests.size()));
1261  auto wait = [&]() { while (start > 0) this_thread::sleep_for(chrono::microseconds(1)); };
1262 
1263  auto l = [&](CPSG_Queue& queue, TReplyStorage& replies) {
1264  start--;
1265  wait();
1266 
1267  for (;;) {
1268  auto i = to_submit--;
1269 
1270  if (i <= 0) break;
1271 
1272  // Submit and get response
1273  auto& request = requests[requests.size() - i];
1274  auto metrics = request->GetUserContext<SMetrics>();
1275 
1276  metrics->Set(SMetricType::eStart);
1277  auto reply = queue.SendRequestAndGetReply(request, CDeadline::eInfinite);
1278  metrics->Set(SMetricType::eSubmit);
1279 
1280  _ASSERT(reply);
1281 
1282  metrics->Set(SMetricType::eReply);
1283 
1284  for (;;) {
1285  auto reply_item = reply->GetNextItem(CDeadline::eInfinite);
1286  _ASSERT(reply_item);
1287 
1288  const auto type = reply_item->GetType();
1289 
1290  if (type == CPSG_ReplyItem::eEndOfReply) break;
1291 
1292  const auto status = reply_item->GetStatus(CDeadline::eInfinite);
1293  metrics->AddItem({type, status});
1294  }
1295 
1296  const auto status = reply->GetStatus(CDeadline::eInfinite);
1297  metrics->Set(SMetricType::eDone);
1298  metrics->AddItem({CPSG_ReplyItem::eEndOfReply, status});
1299 
1300  if (params.report_immediately) {
1301  // Metrics are reported on destruction
1302  metrics.reset();
1303  request.reset();
1304  reply.reset();
1305  } else {
1306  // Store the reply for now to prevent metrics from being written to cout (affects performance)
1307  replies.emplace_back(reply);
1308  }
1309 
1310  if (params.delay) {
1311  this_thread::sleep_for(chrono::duration<double>(params.delay));
1312  }
1313  }
1314  };
1315 
1316  vector<CPSG_Queue> queues;
1317  vector<TReplyStorage> replies(params.user_threads);
1318 
1319  for (size_t i = 0; i < (params.local_queue ? params.user_threads : 1); ++i) {
1320  queues.emplace_back(params.service);
1321  queues.back().SetRequestFlags(params.request_flags);
1322  queues.back().SetUserArgs(params.user_args);
1323  }
1324 
1325  vector<thread> threads;
1326  threads.reserve(params.user_threads);
1327 
1328  // Start threads in advance so it won't affect metrics
1329  for (size_t i = 0; i < params.user_threads; ++i) {
1330  threads.emplace_back(l, ref(queues[params.local_queue ? i : 0]), ref(replies[i]));
1331  }
1332 
1333  wait();
1334 
1335  // Start processing replies
1336  if (SParams::verbose) {
1337  cerr << "\nSubmitting requests: ";
1338  size_t previous = requests.size() / kReportProgressAfter;
1339 
1340  while (to_submit > 0) {
1341  size_t current = to_submit / kReportProgressAfter;
1342 
1343  for (auto i = current; i < previous; ++i) {
1344  cerr << '.';
1345  }
1346 
1347  previous = current;
1348  this_thread::sleep_for(chrono::milliseconds(100));
1349  }
1350 
1351  cerr << "\nWaiting for threads: " << params.user_threads << '\n';
1352  }
1353 
1354  for (auto& t : threads) {
1355  t.join();
1356  }
1357 
1358  // Release all replies held in queues, if any
1359  queues.clear();
1360 
1361  // Output metrics
1362  if (SParams::verbose) cerr << "Outputting metrics: ";
1363  requests.clear();
1364  size_t output = 0;
1365 
1366  for (auto& thread_replies : replies) {
1367  for (auto& reply : thread_replies) {
1368  reply.reset();
1369  if (SParams::verbose && (++output % kReportProgressAfter == 0)) cerr << '.';
1370  }
1371  }
1372 
1373  if (SParams::verbose) cerr << '\n';
1374  return 0;
1375 }
1376 
1377 bool CProcessing::ReadLine(string& line, istream& is)
1378 {
1379  for (;;) {
1380  if (!getline(is, line)) {
1381  return false;
1382  } else if (!line.empty()) {
1383  return true;
1384  }
1385  }
1386 }
1387 
1391 
1393 {
1394  // All JSON types have already been validated with the scheme
1395 
1396  auto context = params_obj.find("context");
1398 
1399  if (context != params_obj.end()) {
1400  auto context_obj = context->value.GetObject();
1401 
1402  auto sid = context_obj.find("sid");
1403 
1404  if (sid != context_obj.end()) {
1405  ctx.SetSessionID(sid->value.GetValue().GetString());
1406  }
1407 
1408  auto phid = context_obj.find("phid");
1409 
1410  if (phid != context_obj.end()) {
1411  ctx.SetHitID(phid->value.GetValue().GetString());
1412  }
1413 
1414  auto auth_token = context_obj.find("auth_token");
1415 
1416  if (auth_token != context_obj.end()) {
1417  ctx.SetProperty("auth_token", auth_token->value.GetValue().GetString());
1418  }
1419 
1420  auto client_ip = context_obj.find("client_ip");
1421 
1422  if (client_ip != context_obj.end()) {
1423  ctx.SetClientIP(client_ip->value.GetValue().GetString());
1424  }
1425  }
1426 
1427  if (!ctx.IsSetSessionID()) ctx.SetSessionID();
1428  if (!ctx.IsSetHitID()) ctx.SetHitID();
1429 
1430  SExtra extra;
1431  extra.Print("request", request);
1432  extra.Print("params", params_obj);
1433 }
1434 
1436 {
1437  _ASSERT(json.IsNumber());
1438 
1439  if (json.IsInt4()) {
1440  Print(prefix, json.GetInt4());
1441  } else if (json.IsUint4()) {
1442  Print(prefix, json.GetUint4());
1443  } else if (json.IsInt8()) {
1444  Print(prefix, json.GetInt8());
1445  } else if (json.IsUint8()) {
1446  Print(prefix, json.GetUint8());
1447  } else if (json.IsDouble()) {
1448  Print(prefix, json.GetDouble());
1449  } else {
1450  _TROUBLE;
1451  }
1452 }
1453 
1455 {
1456  for (size_t i = 0; i < json.size(); ++i) {
1457  Print(prefix + '[' + to_string(i) + ']', json[i]);
1458  }
1459 }
1460 
1462 {
1463  for (const auto& pair : json) {
1464  Print(prefix + '.' + pair.name, pair.value);
1465  }
1466 }
1467 
1469 {
1470  switch (json.GetType()) {
1472  Print(prefix, "<null>");
1473  break;
1474 
1476  Print(prefix, json.GetValue().GetBool() ? "true" : "false");
1477  break;
1478 
1480  Print(prefix, json.GetValue().GetString());
1481  break;
1482 
1484  Print(prefix, json.GetValue());
1485  break;
1486 
1488  Print(prefix, json.GetArray());
1489  break;
1490 
1492  Print(prefix, json.GetObject());
1493  break;
1494  };
1495 }
1496 
1497 int CProcessing::JsonCheck(istream* schema_is)
1498 {
1499  CJson_Document schema_doc;
1500 
1501  if (!schema_is) {
1502  schema_doc = RequestSchema();
1503 
1504  } else if (!schema_doc.Read(*schema_is)) {
1505  cerr << "Error on reading JSON schema: " << schema_doc.GetReadError() << endl;
1506  return -1;
1507 
1508  } else if (CJson_MetaSchema meta_schema; !meta_schema.Validate(schema_doc)) {
1509  cerr << "Error on validating JSON schema: " << meta_schema.GetValidationError() << endl;
1510  return -1;
1511  }
1512 
1513  CJson_Schema schema(schema_doc);
1514  string line;
1515  size_t line_no = 0;
1516  int rv = 0;
1517 
1518  while (ReadLine(line)) {
1519  CJson_Document input_doc;
1520  ++line_no;
1521 
1522  if (!input_doc.ParseString(line)) {
1523  cout << "Error on reading JSON document (" << line_no << "): " << input_doc.GetReadError() << endl;
1524  if (rv == 0) rv = -2;
1525  } else if (schema.Validate(input_doc)) {
1526  cout << "JSON document (" << line_no << ") is valid" << endl;
1527  } else {
1528  cout << "Error on validating JSON document (" << line_no << "): " << schema.GetValidationError() << endl;
1529  if (rv == 0) rv = -3;
1530  }
1531  }
1532 
1533  cout << line_no << " JSON document(s) have been checked" << endl;
1534  return rv;
1535 }
1536 
1538 {
1539  CObjectTypeInfo info(objects::CSeq_id::GetTypeInfo());
1540 
1541  if (auto index = info.FindVariantIndex(type)) return static_cast<CPSG_BioId::TType>(index);
1542  if (auto value = objects::CSeq_id::WhichInverseSeqId(type)) return value;
1543 
1544  return static_cast<CPSG_BioId::TType>(atoi(type.c_str()));
1545 }
1546 
1548 {
1549  auto id = array[0].GetValue().GetString();
1550 
1551  if (array.size() == 1) return CPSG_BioId(id);
1552 
1553  auto value = array[1].GetValue();
1554  auto type = value.IsString() ? GetBioIdType(value.GetString()) : static_cast<CPSG_BioId::TType>(value.GetInt4());
1555  return CPSG_BioId(id, type);
1556 }
1557 
1559 {
1560  CPSG_BioIds rv;
1561 
1562  if (input.has("bio_ids")) {
1563  auto bio_ids = input["bio_ids"].GetArray();
1564 
1565  for (const auto& bio_id : bio_ids) {
1566  rv.push_back(GetBioId(bio_id.GetArray()));
1567  }
1568  } else {
1569  rv.push_back(GetBioId(input["bio_id"].GetArray()));
1570  }
1571 
1572  return rv;
1573 }
1574 
1576 {
1577  auto array = input["blob_id"].GetArray();
1578  auto id = array[0].GetValue().GetString();
1579  return array.size() > 1 ? CPSG_BlobId(std::move(id), array[1].GetValue().GetInt8()) : std::move(id);
1580 }
1581 
1583 {
1584  auto array = input["chunk_id"].GetArray();
1585  return { static_cast<int>(array[0].GetValue().GetInt4()), array[1].GetValue().GetString() };
1586 }
1587 
1589 {
1590  auto na_array = input["named_annots"].GetArray();
1592 
1593  for (const auto& na : na_array) {
1594  names.push_back(na.GetValue().GetString());
1595  }
1596 
1597  return names;
1598 }
1599 
1600 template <>
1602 {
1603  const auto& info_flags = GetInfoFlags();
1604 
1605  auto i = info_flags.begin();
1606  bool all_info_except = specified(i->name);
1607  CPSG_Request_Resolve::TIncludeInfo include_info = all_info_except ? CPSG_Request_Resolve::fAllInfo : CPSG_Request_Resolve::TIncludeInfo(0);
1608 
1609  for (++i; i != info_flags.end(); ++i) {
1610  if (specified(i->name)) {
1611  if (all_info_except) {
1612  include_info &= ~i->value;
1613  } else {
1614  include_info |= i->value;
1615  }
1616  }
1617  }
1618 
1619  // Provide all info if nothing is specified explicitly
1620  return include_info ? include_info : CPSG_Request_Resolve::fAllInfo;
1621 }
1622 
1624 {
1625  if (!input.has("exclude_blobs")) return;
1626 
1627  auto blob_ids = input["exclude_blobs"].GetArray();
1628 
1629  for (const auto& blob_id : blob_ids) {
1630  exclude(blob_id.GetValue().GetString());
1631  }
1632 }
1633 
1634 void SRequestBuilder::SReader<CJson_ConstObject>::SetRequestFlags(shared_ptr<CPSG_Request> request) const
1635 {
1636  if (!input.has("request_flags")) return;
1637 
1638  for (const auto& request_flag : input["request_flags"].GetArray()) {
1639  const auto value = request_flag.GetValue().GetString();
1640 
1641  if (value == "exclude-hup") {
1642  request->SetFlags(CPSG_Request::fExcludeHUP);
1643 
1644  } else if (value == "include-hup") {
1645  request->SetFlags(CPSG_Request::fIncludeHUP);
1646  }
1647  }
1648 }
1649 
1650 const initializer_list<SDataFlag> kDataFlags =
1651 {
1652  { "no-tse", "Return only the info", CPSG_Request_Biodata::eNoTSE },
1653  { "slim-tse", "Return split info blob if available, or nothing", CPSG_Request_Biodata::eSlimTSE },
1654  { "smart-tse", "Return split info blob if available, or original blob", CPSG_Request_Biodata::eSmartTSE },
1655  { "whole-tse", "Return all split blobs if available, or original blob", CPSG_Request_Biodata::eWholeTSE },
1656  { "orig-tse", "Return original blob", CPSG_Request_Biodata::eOrigTSE },
1657 };
1658 
1659 const initializer_list<SDataFlag>& SRequestBuilder::GetDataFlags()
1660 {
1661  return kDataFlags;
1662 }
1663 
1664 const initializer_list<SInfoFlag> kInfoFlags =
1665 {
1666  { "all-info-except", "Return all info except explicitly specified by other flags", CPSG_Request_Resolve::fAllInfo },
1667  { "canonical-id", "Return canonical ID info", CPSG_Request_Resolve::fCanonicalId },
1668  { "name", "Use name for canonical ID info, if returned", CPSG_Request_Resolve::fName },
1669  { "other-ids", "Return other IDs info", CPSG_Request_Resolve::fOtherIds },
1670  { "molecule-type", "Return molecule type info", CPSG_Request_Resolve::fMoleculeType },
1671  { "length", "Return length info", CPSG_Request_Resolve::fLength },
1672  { "chain-state", "Return chain state info (in seq_state pair)", CPSG_Request_Resolve::fChainState },
1673  { "state", "Return state info", CPSG_Request_Resolve::fState },
1674  { "blob-id", "Return blob ID info", CPSG_Request_Resolve::fBlobId },
1675  { "tax-id", "Return tax ID info", CPSG_Request_Resolve::fTaxId },
1676  { "hash", "Return hash info", CPSG_Request_Resolve::fHash },
1677  { "date-changed", "Return date changed info", CPSG_Request_Resolve::fDateChanged },
1678  { "gi", "Return GI", CPSG_Request_Resolve::fGi },
1679 };
1680 
1681 const initializer_list<SInfoFlag>& SRequestBuilder::GetInfoFlags()
1682 {
1683  return kInfoFlags;
1684 }
1685 
1687 {
1688  CJson_Document rv(R"REQUEST_SCHEMA(
1689 {
1690  "type": "object",
1691  "definitions": {
1692  "jsonrpc": {
1693  "$id": "#jsonrpc",
1694  "enum": [
1695  "2.0"
1696  ]
1697  },
1698  "bio_id": {
1699  "$id": "#bio_id",
1700  "type": "array",
1701  "items": [
1702  {
1703  "type": "string"
1704  },
1705  {
1706  "oneOf": [
1707  {
1708  "type": "string"
1709  },
1710  {
1711  "type": "number"
1712  }
1713  ]
1714  }
1715  ],
1716  "minItems": 1,
1717  "maxItems": 2
1718  },
1719  "bio_ids": {
1720  "$id": "#bio_ids",
1721  "type": "array",
1722  "items": {
1723  "$ref": "#/definitions/bio_id"
1724  },
1725  "minItems": 1
1726  },
1727  "blob_id": {
1728  "$id": "#blob_id",
1729  "type": "array",
1730  "items": [
1731  {
1732  "type": "string"
1733  },
1734  {
1735  "type": "number"
1736  }
1737  ],
1738  "minItems": 1,
1739  "maxItems": 2
1740  },
1741  "chunk_id": {
1742  "$id": "#chunk_id",
1743  "type": "array",
1744  "items": [
1745  {
1746  "type": "number"
1747  },
1748  {
1749  "type": "string"
1750  }
1751  ],
1752  "minItems": 2,
1753  "maxItems": 2
1754  },
1755  "include_data": {
1756  "$id": "#include_data",
1757  "enum": [
1758  "no-tse",
1759  "slim-tse",
1760  "smart-tse",
1761  "whole-tse",
1762  "orig-tse"
1763  ]
1764  },
1765  "include_info": {
1766  "$id": "#include_info",
1767  "type": "array",
1768  "items": {
1769  "type": "string",
1770  "enum": [
1771  "all-info-except",
1772  "canonical-id",
1773  "name",
1774  "other-ids",
1775  "molecule-type",
1776  "length",
1777  "chain-state",
1778  "state",
1779  "blob-id",
1780  "tax-id",
1781  "hash",
1782  "date-changed",
1783  "gi"
1784  ]
1785  },
1786  "uniqueItems": true
1787  },
1788  "named_annots": {
1789  "$id": "#named_annots",
1790  "type": "array",
1791  "items": {
1792  "type": "string"
1793  }
1794  },
1795  "exclude_blobs": {
1796  "$id": "#exclude_blobs",
1797  "type": "array",
1798  "items": {
1799  "type": "string"
1800  }
1801  },
1802  "acc_substitution": {
1803  "$id": "#acc_substitution",
1804  "enum": [
1805  "default",
1806  "limited",
1807  "never"
1808  ]
1809  },
1810  "snp_scale_limit": {
1811  "$id": "#snp_scale_limit",
1812  "enum": [
1813  "default",
1814  "unit",
1815  "contig",
1816  "supercontig",
1817  "chromosome"
1818  ]
1819  },
1820  "context": {
1821  "$id": "#context",
1822  "type": "object",
1823  "properties": {
1824  "sid": {
1825  "type": "string"
1826  },
1827  "phid": {
1828  "type": "string"
1829  },
1830  "auth_token": {
1831  "type": "string"
1832  },
1833  "client_ip": {
1834  "type": "string"
1835  }
1836  }
1837  },
1838  "request_flags": {
1839  "$id": "#request_flags",
1840  "type": "array",
1841  "items": {
1842  "type": "string",
1843  "enum": [
1844  "exclude-hup",
1845  "include-hup"
1846  ]
1847  },
1848  "uniqueItems": true
1849  },
1850  )REQUEST_SCHEMA"
1851  R"REQUEST_SCHEMA(
1852  "biodata": {
1853  "$id": "#biodata",
1854  "type": "object",
1855  "properties": {
1856  "jsonrpc": {
1857  "$ref": "#/definitions/jsonrpc"
1858  },
1859  "method": {
1860  "enum": [
1861  "biodata"
1862  ]
1863  },
1864  "params": {
1865  "type": "object",
1866  "properties": {
1867  "bio_id": {
1868  "$ref": "#/definitions/bio_id"
1869  },
1870  "include_data": {
1871  "$ref": "#/definitions/include_data"
1872  },
1873  "exclude_blobs": {
1874  "$ref": "#/definitions/exclude_blobs"
1875  },
1876  "acc_substitution": {
1877  "$ref": "#/definitions/acc_substitution"
1878  },
1879  "bio_id_resolution": {
1880  "type": "boolean"
1881  },
1882  "resend_timeout": {
1883  "type": "number"
1884  },
1885  "context": {
1886  "$ref": "#/definitions/context"
1887  },
1888  "request_flags": {
1889  "$ref": "#/definitions/request_flags"
1890  },
1891  "user_args": {
1892  "type": "string"
1893  }
1894  },
1895  "required": [
1896  "bio_id"
1897  ]
1898  },
1899  "id": {
1900  "type": "string"
1901  }
1902  },
1903  "required": [
1904  "jsonrpc",
1905  "method",
1906  "params",
1907  "id"
1908  ]
1909  },
1910  "blob": {
1911  "$id": "#blob",
1912  "type": "object",
1913  "properties": {
1914  "jsonrpc": {
1915  "$ref": "#/definitions/jsonrpc"
1916  },
1917  "method": {
1918  "enum": [
1919  "blob"
1920  ]
1921  },
1922  "params": {
1923  "type": "object",
1924  "properties": {
1925  "blob_id": {
1926  "$ref": "#/definitions/blob_id"
1927  },
1928  "include_data": {
1929  "$ref": "#/definitions/include_data"
1930  },
1931  "context": {
1932  "$ref": "#/definitions/context"
1933  },
1934  "request_flags": {
1935  "$ref": "#/definitions/request_flags"
1936  },
1937  "user_args": {
1938  "type": "string"
1939  }
1940  },
1941  "required": [
1942  "blob_id"
1943  ]
1944  },
1945  "id": {
1946  "type": "string"
1947  }
1948  },
1949  "required": [
1950  "jsonrpc",
1951  "method",
1952  "params",
1953  "id"
1954  ]
1955  },
1956  "resolve": {
1957  "$id": "#resolve",
1958  "type": "object",
1959  "properties": {
1960  "jsonrpc": {
1961  "$ref": "#/definitions/jsonrpc"
1962  },
1963  "method": {
1964  "enum": [
1965  "resolve"
1966  ]
1967  },
1968  "params": {
1969  "type": "object",
1970  "properties": {
1971  "bio_id": {
1972  "$ref": "#/definitions/bio_id"
1973  },
1974  "include_info": {
1975  "$ref": "#/definitions/include_info"
1976  },
1977  "acc_substitution": {
1978  "$ref": "#/definitions/acc_substitution"
1979  },
1980  "bio_id_resolution": {
1981  "type": "boolean"
1982  },
1983  "context": {
1984  "$ref": "#/definitions/context"
1985  },
1986  "request_flags": {
1987  "$ref": "#/definitions/request_flags"
1988  },
1989  "user_args": {
1990  "type": "string"
1991  }
1992  },
1993  "required": [
1994  "bio_id"
1995  ]
1996  },
1997  "id": {
1998  "type": "string"
1999  }
2000  },
2001  "required": [
2002  "jsonrpc",
2003  "method",
2004  "params",
2005  "id"
2006  ]
2007  },
2008  "named_annot": {
2009  "$id": "#named_annot",
2010  "type": "object",
2011  "properties": {
2012  "jsonrpc": {
2013  "$ref": "#/definitions/jsonrpc"
2014  },
2015  "method": {
2016  "enum": [
2017  "named_annot"
2018  ]
2019  },
2020  "params": {
2021  "type": "object",
2022  "properties": {
2023  "bio_id": {
2024  "$ref": "#/definitions/bio_id"
2025  },
2026  "bio_ids": {
2027  "$ref": "#/definitions/bio_ids"
2028  },
2029  "named_annots": {
2030  "$ref": "#/definitions/named_annots"
2031  },
2032  "acc_substitution": {
2033  "$ref": "#/definitions/acc_substitution"
2034  },
2035  "bio_id_resolution": {
2036  "type": "boolean"
2037  },
2038  "snp_scale_limit": {
2039  "$ref": "#/definitions/snp_scale_limit"
2040  },
2041  "context": {
2042  "$ref": "#/definitions/context"
2043  },
2044  "request_flags": {
2045  "$ref": "#/definitions/request_flags"
2046  },
2047  "user_args": {
2048  "type": "string"
2049  }
2050  },
2051  "required": [
2052  "named_annots"
2053  ],
2054  "oneOf": [
2055  {
2056  "required": [
2057  "bio_id"
2058  ]
2059  },
2060  {
2061  "required": [
2062  "bio_ids"
2063  ]
2064  }
2065  ]
2066  },
2067  "id": {
2068  "type": "string"
2069  }
2070  },
2071  "required": [
2072  "jsonrpc",
2073  "method",
2074  "params",
2075  "id"
2076  ]
2077  },
2078  "chunk": {
2079  "$id": "#chunk",
2080  "type": "object",
2081  "properties": {
2082  "jsonrpc": {
2083  "$ref": "#/definitions/jsonrpc"
2084  },
2085  "method": {
2086  "enum": [
2087  "chunk"
2088  ]
2089  },
2090  "params": {
2091  "type": "object",
2092  "properties": {
2093  "chunk_id": {
2094  "$ref": "#/definitions/chunk_id"
2095  },
2096  "context": {
2097  "$ref": "#/definitions/context"
2098  },
2099  "request_flags": {
2100  "$ref": "#/definitions/request_flags"
2101  },
2102  "user_args": {
2103  "type": "string"
2104  }
2105  },
2106  "required": [
2107  "chunk_id"
2108  ]
2109  },
2110  "id": {
2111  "type": "string"
2112  }
2113  },
2114  "required": [
2115  "jsonrpc",
2116  "method",
2117  "params",
2118  "id"
2119  ]
2120  },
2121  "ipg_resolve": {
2122  "$id": "#ipg_resolve",
2123  "type": "object",
2124  "properties": {
2125  "jsonrpc": {
2126  "$ref": "#/definitions/jsonrpc"
2127  },
2128  "method": {
2129  "enum": [
2130  "ipg_resolve"
2131  ]
2132  },
2133  "params": {
2134  "type": "object",
2135  "properties": {
2136  "protein": {
2137  "type": "string"
2138  },
2139  "ipg": {
2140  "type": "number"
2141  },
2142  "nucleotide": {
2143  "type": "string"
2144  },
2145  "context": {
2146  "$ref": "#/definitions/context"
2147  },
2148  "request_flags": {
2149  "$ref": "#/definitions/request_flags"
2150  },
2151  "user_args": {
2152  "type": "string"
2153  }
2154  },
2155  "dependencies": {
2156  "nucleotide": [
2157  "protein"
2158  ]
2159  },
2160  "anyOf": [
2161  {
2162  "required": [
2163  "protein"
2164  ]
2165  },
2166  {
2167  "required": [
2168  "ipg"
2169  ]
2170  }
2171  ]
2172  },
2173  "id": {
2174  "type": "string"
2175  }
2176  },
2177  "required": [
2178  "jsonrpc",
2179  "method",
2180  "params",
2181  "id"
2182  ]
2183  },
2184  "raw": {
2185  "$id": "#raw",
2186  "type": "object",
2187  "properties": {
2188  "jsonrpc": {
2189  "$ref": "#/definitions/jsonrpc"
2190  },
2191  "method": {
2192  "enum": [
2193  "raw"
2194  ]
2195  },
2196  "params": {
2197  "type": "object",
2198  "properties": {
2199  "abs_path_ref": {
2200  "type": "string"
2201  },
2202  "context": {
2203  "$ref": "#/definitions/context"
2204  },
2205  "request_flags": {
2206  "$ref": "#/definitions/request_flags"
2207  },
2208  "user_args": {
2209  "type": "string"
2210  }
2211  },
2212  "required": [
2213  "abs_path_ref"
2214  ]
2215  },
2216  "id": {
2217  "type": "string"
2218  }
2219  },
2220  "required": [
2221  "jsonrpc",
2222  "method",
2223  "params",
2224  "id"
2225  ]
2226  }
2227  },
2228  "oneOf": [
2229  {
2230  "$ref": "#/definitions/biodata"
2231  },
2232  {
2233  "$ref": "#/definitions/blob"
2234  },
2235  {
2236  "$ref": "#/definitions/resolve"
2237  },
2238  {
2239  "$ref": "#/definitions/named_annot"
2240  },
2241  {
2242  "$ref": "#/definitions/chunk"
2243  },
2244  {
2245  "$ref": "#/definitions/ipg_resolve"
2246  },
2247  {
2248  "$ref": "#/definitions/raw"
2249  }
2250  ]
2251 }
2252  )REQUEST_SCHEMA");
2253 
2254  _DEBUG_CODE(if (!rv.ReadSucceeded()) { auto error = rv.GetReadError(); NCBI_ALWAYS_TROUBLE(error.c_str()); });
2255  _DEBUG_CODE(if (CJson_MetaSchema m; !m.Validate(rv)) { auto error = m.GetValidationError(); NCBI_ALWAYS_TROUBLE(error.c_str()); });
2256  return rv;
2257 }
2258 
User-defined methods of the data storage class.
User-defined methods of the data storage class.
Checksum and hash calculation classes.
Temporary object for holding extra message arguments.
Definition: ncbidiag.hpp:1828
CHash – Hash calculator.
Definition: checksum.hpp:195
static void SetDataLimit(size_t value)
Definition: processing.hpp:91
CJson_Object m_JsonObj
Definition: processing.hpp:152
static bool sm_SetReplyType
Definition: processing.hpp:154
static auto sm_PreviewSize
Definition: processing.hpp:156
void Set(const char *name, shared_ptr< TReplyItem > &reply_item)
Definition: processing.hpp:125
static CJsonResponse NewItem(const shared_ptr< CPSG_ReplyItem > &reply_item)
Definition: processing.cpp:245
static void SetReplyType(bool value)
Definition: processing.hpp:90
static auto sm_DataLimit
Definition: processing.hpp:155
void Fill(EPSG_Status status, TItem item, string first_message)
Definition: processing.cpp:482
static void SetPreviewSize(size_t value)
Definition: processing.hpp:92
void FillWithRequestID(EPSG_Status status, TItem item, EDoNotAddRequestID)
Definition: processing.cpp:212
void push_back(void)
Add null element to the end of the array.
CJson_Array.
size_t size(void) const
Return the number of elements in the array.
CJson_Node.
bool IsObject(void) const
CJson_ConstObject GetObject(void) const
Get JSON object contents of the node.
CJson_ConstArray GetArray(void) const
Get JSON array contents of the node.
bool IsNull(void) const
CJson_ConstValue GetValue(void) const
Get JSON value contents of the node.
EJsonType GetType(void) const
Get value type.
CJson_Object.
const_iterator find(const CJson_Node::TKeyType &name) const
Return an iterator that points to the location of the element.
const_iterator end(void) const
Return an iterator that points to the location after the last element.
CJson_Value.
Uint4 GetUint4(void) const
Uint8 GetUint8(void) const
bool GetBool(void) const
Get primitive value data.
Int8 GetInt8(void) const
Int4 GetInt4(void) const
bool IsInt8(void) const
TStringType GetString(void) const
bool IsUint8(void) const
double GetDouble(void) const
bool IsUint4(void) const
bool IsInt4(void) const
bool IsDouble(void) const
bool IsNumber(void) const
bool Read(std::istream &in)
Read JSON data from a stream.
std::string GetReadError(void) const
Get most recent read error.
bool ParseString(const TStringType &v)
Read JSON data from a UTF8 string.
bool ReadSucceeded(void) const
Test if the most recent read was successful.
bool Write(std::ostream &out, TJson_Write_Flags flags=fJson_Write_IndentWithSpace, unsigned int indent_char_count=4) const
Write JSON data into a stream.
JSON meta-schema describes JSON schema itself So, other JSON schemas can be validated against it.
CJson_Object ResetObject(void)
Erase node data and convert it into JSON object.
CJson_Array ResetArray(void)
Erase node data and convert it into JSON array.
CJson_Node & AssignCopy(const CJson_ConstNode &n)
Copy Node contents data into this node.
CJson_Node & SetNull(void)
Erase node data and convert it into JSON NULL value.
CJson_Value SetValue(void)
Get JSON value contents of the node.
void insert(const CJson_Node::TKeyType &name)
Insert null element into the object.
CJson_Object insert_object(const CJson_Node::TKeyType &name)
Insert object type element into the object.
CJson_Schema.
bool Validate(const CJson_Document &v)
Validate JSON document against schema.
std::string GetValidationError() const
Return validation error.
void Start(EWhich which)
CObjectStreamCopier –.
Definition: objcopy.hpp:71
CObjectTypeInfo –.
Definition: objectinfo.hpp:94
Bio-id (such as accession)
Definition: psg_client.hpp:174
const string & GetId() const
Get ID.
Definition: psg_client.hpp:195
TType GetType() const
Get type.
Definition: psg_client.hpp:198
Blob unique ID.
Definition: psg_client.hpp:225
const string & GetId() const
Get ID.
Definition: psg_client.hpp:249
const TLastModified & GetLastModified() const
Get last modified.
Definition: psg_client.hpp:252
CNullable< Int8 > TLastModified
Definition: psg_client.hpp:227
Chunk unique ID.
Definition: psg_client.hpp:264
const string & GetId2Info() const
Get ID2 info.
Definition: psg_client.hpp:278
int GetId2Chunk() const
Get ID2 chunk number.
Definition: psg_client.hpp:275
A class derived from the queue class that additionally allows to run event loop.
bool Run(CDeadline deadline)
Process everything in the queue until it's empty or times out.
A queue to retrieve data (accession resolution info; bio-sequence; annotation blobs) from the storage...
bool SendRequest(shared_ptr< CPSG_Request > request, CDeadline deadline)
Push request into the queue.
shared_ptr< CPSG_Reply > SendRequestAndGetReply(shared_ptr< CPSG_Request > request, CDeadline deadline)
Push request into the queue and get corresponding reply.
void Stop()
Stop accepting new requests.
@ eEndOfReply
No more items expected in the (overall!) reply.
Definition: psg_client.hpp:665
@ eNoTSE
Only the info.
Definition: psg_client.hpp:339
@ eSmartTSE
If ID2 split is available, return split info blob only.
Definition: psg_client.hpp:347
@ eSlimTSE
If ID2 split is available, return split info blob only.
Definition: psg_client.hpp:343
@ eWholeTSE
If ID2 split is available, return all split blobs.
Definition: psg_client.hpp:351
@ eOrigTSE
Return all Cassandra data chunks of the blob itself.
Definition: psg_client.hpp:354
CNullable< string > TNucleotide
Definition: psg_client.hpp:599
vector< string > TAnnotNames
Names of the named annotations.
Definition: psg_client.hpp:497
@ fName
Requests name to use for canonical bio-id.
Definition: psg_client.hpp:422
CParallelProcessing(const TParams &params)
list< SThread > m_Threads
Definition: processing.hpp:315
list< CPSG_EventLoop > m_PsgQueues
Definition: processing.hpp:314
static vector< shared_ptr< CPSG_Request > > ReadCommands(TCreateContext create_context, size_t report_progress_after=0)
static int JsonCheck(istream *schema_is)
static int OneRequest(const SOneRequestParams &params, shared_ptr< CPSG_Request > request)
Definition: processing.cpp:822
static CParallelProcessing< SBatchResolveParams > CreateParallelProcessing(const SBatchResolveParams &params)
static CJson_Document RequestSchema()
static bool ReadLine(string &line, istream &is=cin)
static int Performance(const SPerformanceParams &params)
Take guard of the current CRequestContext, handle app-state, start/stop logging and request status in...
CTypeInfo class contains all information about C++ types (both basic and classes): members and layout...
Definition: typeinfo.hpp:76
CZipStreamDecompressor – zlib based decompression stream processor.
Definition: zlib.hpp:817
Definition: map.hpp:338
void Print(const CCompactSAMApplication::AlignInfo &ai)
char value[7]
Definition: config.c:431
CS_CONTEXT * ctx
Definition: t0006.c:12
static const struct name_t names[]
std::ofstream out("events_result.xml")
main entry point for tests
static int type
Definition: getdata.c:31
#define TAX_ID_TO(T, tax_id)
Definition: ncbimisc.hpp:1110
#define GI_TO(T, gi)
Definition: ncbimisc.hpp:1085
@ null
Definition: ncbimisc.hpp:646
@ eTakeOwnership
An object can take ownership of another.
Definition: ncbi_types.h:136
string
Definition: cgiapp.hpp:687
#define NCBI_ALWAYS_TROUBLE(mess)
Definition: ncbidbg.hpp:85
#define _DEBUG_CODE(code)
Definition: ncbidbg.hpp:136
#define _VERIFY(expr)
Definition: ncbidbg.hpp:161
CDiagContext_Extra & Print(const string &name, const string &value)
The method does not print the argument, but adds it to the string.
Definition: ncbidiag.cpp:2622
CDiagContext & GetDiagContext(void)
Get diag context instance.
Definition: logging.cpp:818
static void SetRequestContext(CRequestContext *ctx)
Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()
Definition: ncbidiag.cpp:1907
void SetRequestID(TCount rid)
Set request ID.
static CRequestContext & GetRequestContext(void)
Shortcut to CDiagContextThreadData::GetThreadData().GetRequestContext()
Definition: ncbidiag.cpp:1901
void SetStatus(int status)
Set request context status.
@ e499_BrokenConnection
Non-standard status code - used to indicate broken connection while serving normal request.
string ReportThis(TDiagPostFlags flags=eDPF_Exception) const
Report this exception only.
Definition: ncbiexpt.cpp:397
#define ENUM_METHOD_NAME(EnumName)
Definition: serialbase.hpp:994
ESerialDataFormat
Data file format.
Definition: serialdef.hpp:71
@ eSerial_AsnText
ASN.1 text.
Definition: serialdef.hpp:73
@ eSerial_Xml
XML.
Definition: serialdef.hpp:75
@ eSerial_Json
JSON.
Definition: serialdef.hpp:76
@ eSerial_None
Definition: serialdef.hpp:72
@ eSerial_AsnBinary
ASN.1 binary.
Definition: serialdef.hpp:74
static CObjectOStream * Open(ESerialDataFormat format, CNcbiOstream &outStream, bool deleteOutStream)
Create serial object writer and attach it to an output stream.
Definition: objostr.cpp:126
static CObjectIStream * Open(ESerialDataFormat format, CNcbiIstream &inStream, bool deleteInStream)
Create serial object reader and attach it to an input stream.
Definition: objistr.cpp:195
void Copy(const CObjectTypeInfo &type)
Copy data.
Definition: objcopy.cpp:74
int64_t Int8
8-byte (64-bit) signed integer
Definition: ncbitype.h:104
uint64_t Uint8
8-byte (64-bit) unsigned integer
Definition: ncbitype.h:105
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
static string Base64Encode(const CTempString str, size_t line_len=0)
Base64-encode string.
Definition: ncbistr.cpp:6266
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
Definition: ncbistr.cpp:3550
static string Base64Decode(const CTempString str)
Base64-decode string.
Definition: ncbistr.cpp:6282
@ eNoWait
No-wait, expires immediately.
Definition: ncbitime.hpp:1846
@ eInfinite
Infinite deadline.
Definition: ncbitime.hpp:1845
E_Choice
Choice variants.
Definition: Seq_id_.hpp:93
unsigned int
A callback function used to compare two keys in a database.
Definition: types.hpp:1210
@ e_not_set
static int input()
int i
yy_size_t n
static MDB_envinfo info
Definition: mdb_load.c:37
constexpr auto sort(_Init &&init)
constexpr bool empty(list< Ts... >) noexcept
EIPRangeType t
Definition: ncbi_localip.c:101
T min(T x_, T y_)
static Format format
Definition: njn_ioutil.cpp:53
std::istream & in(std::istream &in_, double &x_)
double r(size_t dimension_, const Int4 *score_, const double *prob_, double theta_)
static const char * prefix[]
Definition: pcregrep.c:405
string ReadLine(CNcbiIstream &in)
Definition: phrap.cpp:74
static SQLCHAR output[256]
Definition: print.c:5
void s_ItemComplete< SInteractiveParams >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
void s_ItemComplete(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
Definition: processing.cpp:792
CRequestStatus::ECode s_PsgStatusToRequestStatus(EPSG_Status psg_status)
EJsonRpcErrors
Definition: processing.cpp:56
@ eJsonRpc_InvalidRequest
Definition: processing.cpp:58
@ eJsonRpc_ExceptionOnRead
Definition: processing.cpp:59
@ eJsonRpc_ParseError
Definition: processing.cpp:57
const char * s_StrStatus(EPSG_Status status)
Definition: processing.cpp:127
false_type no_server_mode
string s_ProgressStatusToString(CPSG_Processor::EProgressStatus progress_status)
Definition: processing.cpp:442
string s_GetId(const CJson_Document &req_doc)
auto s_IsRawResponse(const CPSG_BlobId *blob_id)
Definition: processing.cpp:311
void s_ReplyComplete< SOneRequestParams >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
Definition: processing.cpp:804
shared_ptr< CPSG_Reply > s_GetReply(shared_ptr< CPSG_ReplyItem > &item)
void s_NewItem(SJsonOut &, const shared_ptr< CPSG_ReplyItem > &)
Definition: processing.cpp:812
void s_ReplyComplete< SInteractiveParams, testing, server_mode >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
const initializer_list< SInfoFlag > kInfoFlags
true_type testing
void s_ReplyComplete(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
Definition: processing.cpp:798
void s_NewItem< SInteractiveParams, verbose >(SJsonOut &json_out, const shared_ptr< CPSG_ReplyItem > &item)
false_type no_testing
const initializer_list< SDataFlag > kDataFlags
true_type server_mode
void s_ReplyComplete< SInteractiveParams, testing, no_server_mode >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
ESerialDataFormat s_GetInputFormat(const string &format)
Definition: processing.cpp:641
void s_ReplyComplete< no_verbose >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
Definition: processing.cpp:920
false_type no_verbose
Definition: processing.cpp:903
void s_ReplyComplete< SInteractiveParams, no_testing, no_server_mode >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
TTypeInfo s_GetInputType(const shared_ptr< CPSG_BlobData > &blob_data)
Definition: processing.cpp:650
auto s_IsRawRequest(shared_ptr< const CPSG_Request > &request)
Definition: processing.cpp:306
void s_ItemComplete< no_verbose >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
Definition: processing.cpp:906
const char * s_GetItemName(CPSG_ReplyItem::EType type, bool trouble=true)
Definition: processing.cpp:227
true_type verbose
Definition: processing.cpp:902
string s_ReasonToString(CPSG_SkippedBlob::EReason reason)
Definition: processing.cpp:369
void s_ReplyComplete< SInteractiveParams, no_testing, server_mode >(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
void s_NewItem< SOneRequestParams >(SJsonOut &json_out, const shared_ptr< CPSG_ReplyItem > &reply_item)
Definition: processing.cpp:817
EPSG_Status
Retrieval result.
Definition: psg_client.hpp:625
@ eSuccess
Successfully retrieved.
@ eInProgress
Retrieval is not finalized yet, more info may come.
@ eForbidden
User is not authorized for the retrieval.
@ eCanceled
Request canceled.
@ eError
An error was encountered while trying to send request or to read and to process the reply.
@ eNotFound
Not found.
vector< CPSG_BioId > CPSG_BioIds
Definition: psg_client.hpp:205
static const char * schema
Definition: stats.c:20
SImpl(const TParams &params)
void(*)(SJsonOut &, EPSG_Status, const shared_ptr< CPSG_ReplyItem > &) TItemComplete
Definition: processing.hpp:288
void(*)(SJsonOut &, EPSG_Status, const shared_ptr< CPSG_Reply > &) TReplyComplete
Definition: processing.hpp:289
void Init(const TParams &params)
Definition: processing.cpp:877
TReplyComplete GetReplyComplete()
Definition: processing.cpp:947
TItemComplete GetItemComplete()
Definition: processing.cpp:936
void Submitter(CPSG_Queue &output)
void(*)(SJsonOut &, const shared_ptr< CPSG_ReplyItem > &) TNewItem
Definition: processing.hpp:290
bool ReportErrors(EPSG_Status status, TItem item, const char *prefix)
Definition: processing.cpp:581
unordered_map< string, pair< shared_ptr< CPSG_BlobInfo >, shared_ptr< CPSG_BlobData > > > m_Data
Definition: processing.cpp:576
SDataOnlyCopy(const SOneRequestParams::SDataOnly &params)
Definition: processing.cpp:559
const SOneRequestParams::SDataOnly & m_Params
Definition: processing.cpp:575
EPSG_Status m_Status
Definition: processing.cpp:577
void ItemComplete(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
Definition: processing.cpp:620
void ReplyComplete(EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
Definition: processing.cpp:636
void Process(shared_ptr< CPSG_BlobInfo > blob_info)
Definition: processing.cpp:661
CDiagContext_Extra & Print(const string &name, const string &value)
The method does not print the argument, but adds it to the string.
Definition: ncbidiag.cpp:2622
void Print(const string &prefix, CJson_ConstValue json)
SInteractiveNewRequestStart(const string &request, CJson_ConstObject params_obj)
const size_t preview_size
Definition: processing.hpp:236
static string GetService(string service, bool one_server)
Definition: processing.cpp:536
const size_t data_limit
Definition: processing.hpp:235
char m_Separator
Definition: processing.hpp:75
const TJson_Write_Flags m_Flags
Definition: processing.hpp:77
mutex m_Mutex
Definition: processing.hpp:74
SJsonOut & operator<<(const CJson_Document &doc)
Definition: processing.cpp:141
const bool m_Pipe
Definition: processing.hpp:76
static const char * Name(EType t)
Definition: processing.cpp:106
void Set(EType t)
Definition: performance.hpp:84
void OutputItems(ostream &os) const
Definition: processing.cpp:550
vector< TItem > m_Items
Definition: performance.hpp:98
SNewRequestContext(const SNewRequestContext &)=delete
CRef< CRequestContext > Get() const
Definition: processing.cpp:76
void operator=(const SNewRequestContext &)=delete
CRef< CRequestContext > m_RequestContext
Definition: processing.cpp:82
static void ReplyComplete(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
Definition: processing.cpp:741
static void ItemComplete(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
Definition: processing.cpp:736
void ReplyComplete(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
Definition: processing.cpp:778
map< CPSG_ReplyItem::EType, shared_ptr< CPSG_ReplyItem > > m_Items
Definition: processing.cpp:758
void ItemComplete(SJsonOut &json_out, EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
Definition: processing.cpp:762
const ESerialDataFormat output_format
Definition: processing.hpp:186
const CLogLatencies::EWhich which
Definition: processing.hpp:178
SDataOnly data_only
Definition: processing.hpp:190
const SPSG_UserArgs user_args
Definition: processing.hpp:163
const string service
Definition: processing.hpp:161
static bool verbose
Definition: processing.hpp:165
const CPSG_Request::TFlags request_flags
Definition: processing.hpp:162
const bool local_queue
Definition: processing.hpp:255
const double delay
Definition: processing.hpp:254
const size_t user_threads
Definition: processing.hpp:253
const bool report_immediately
Definition: processing.hpp:256
static CPSG_Request_Resolve::TIncludeInfo GetIncludeInfo(TSpecified specified)
static CPSG_BioId::TType GetBioIdType(const string &type)
function< void(string)> TExclude
Definition: processing.hpp:396
static shared_ptr< TRequest > Build(const TInput &input, TArgs &&... args)
Definition: processing.hpp:495
static const initializer_list< SInfoFlag > & GetInfoFlags()
static const initializer_list< SDataFlag > & GetDataFlags()
function< bool(const string &)> TSpecified
Definition: processing.hpp:395
Definition: inftrees.h:24
Definition: _hash_fun.h:40
Definition: type.c:6
#define _TROUBLE
#define _ASSERT
else result
Definition: token2.c:20
ZLib Compression API.
Modified on Sat Dec 02 09:22:42 2023 by modify_doxy.py rev. 669887