NCBI C++ ToolKit
psg_evloop.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: psg_evloop.cpp 103099 2024-09-06 18:05:48Z vasilche $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Eugene Vasilchenko, Aleksey Grichenko
27  *
28  * File Description: Event loop for PSG data loader
29  *
30  * ===========================================================================
31  */
32 
33 #include <ncbi_pch.hpp>
38 #include <util/thread_pool.hpp>
39 #include <corelib/ncbithr.hpp>
40 #include <corelib/ncbi_system.hpp>
41 
42 #if defined(HAVE_PSG_LOADER)
43 
45 //#define NCBI_USE_ERRCODE_X PSGLoader
46 //NCBI_DEFINE_ERR_SUBCODE_X(1);
49 
50 
51 /////////////////////////////////////////////////////////////////////////////
52 // Object references held:
53 //
54 // CPSGL_Queue:
55 // is created by PSG loader
56 // is destructed by PSG loader
57 // uses CPSGL_RequestTracker(s) for callback while they are registered by CPSGL_QueueGuard
58 //
59 // CPSGL_QueueGuard:
60 // is created only on stack by some caller method
61 // is destructed at the end of the caller
62 // does cleanup in destructor:
63 // stops unfinished background task(s)
64 // deregisters CPSGL_RequestTracker from(s) CPSGL_Queue
65 // holds CPSGL_Queue
66 // holds CPSGL_RequestTracker(s) until they are processed and results are obtained by the caller
67 //
68 // CPSGL_RequestTracker:
69 // is created by CPSGL_QueueGuard
70 // is destructed by CPSGL_QueueGuard
71 // holds CPSG_Request
72 // holds CPSG_Reply
73 // holds CPSGL_Processor
74 // holds background task(s)
75 // uses CPSGL_QueueGuard for starting background threads
76 // uses CPSGL_QueueGuard to notify end of processing
77 //
78 // background task:
79 // is created by CPSGL_RequestTracker
80 // is destructed by last reference
81 // holds CPSGL_RequestTracker until finished
82 //
83 // CPSGL_Processor:
84 // is created by the caller
85 // is destructed by the caller
86 // holds results necessary by the caller
87 //
88 // CPSGL_ResultGuard:
89 // is created by CPSGL_QueueGuard
90 // is destructed by the caller
91 // holds CPSGL_Processor
92 //
93 //
94 /////////////////////////////////////////////////////////////////////////////
95 // NOTE:
96 //
97 // Runtime circual reference will appear between CPSGL_RequestTracker and its
98 // background task(s).
99 // The circle will be broken either:
100 // 1. when the background task finishes
101 // 2. by CPSGL_QueueGuard destructor
102 // - it will also request to cancel the task(s)
103 //
104 
105 /////////////////////////////////////////////////////////////////////////////
106 // CPSGL_Queue
107 /////////////////////////////////////////////////////////////////////////////
108 
109 
110 CPSGL_Queue::CPSGL_Queue(const string& service_name)
111  : m_EventLoop(service_name,
112  bind(&CPSGL_Queue::ProcessItemCallback, this, placeholders::_1, placeholders::_2),
113  bind(&CPSGL_Queue::ProcessReplyCallback, this, placeholders::_1, placeholders::_2)),
114  m_EventLoopThread(&CPSG_EventLoop::Run, ref(m_EventLoop), CDeadline::eInfinite)
115 {
116 }
117 
118 
120 {
121  m_EventLoop.Reset();
122  m_EventLoopThread.join();
123 }
124 
125 
127 {
129 }
130 
131 
133 {
135  _VERIFY(m_TrackerMap.insert(make_pair(tracker->GetRequest().get(), tracker)).second);
136 }
137 
138 
140 {
142  m_TrackerMap.erase(tracker->GetRequest().get());
143 }
144 
145 
146 CRef<CPSGL_RequestTracker> CPSGL_Queue::GetTracker(const shared_ptr<CPSG_Reply>& reply)
147 {
149  auto iter = m_TrackerMap.find(reply->GetRequest().get());
150  if ( iter == m_TrackerMap.end() ) {
151  return null;
152  }
153  return Ref(iter->second);
154 }
155 
156 
157 inline
158 CRef<CPSGL_RequestTracker> CPSGL_Queue::GetTracker(const shared_ptr<CPSG_ReplyItem>& item)
159 {
160  return GetTracker(item->GetReply());
161 }
162 
163 
165 {
166  shared_ptr<CPSG_Request> request = tracker->GetRequest();
167  if ( m_RequestContext ) {
168  request->SetRequestContext(m_RequestContext);
169  }
171 }
172 
173 
175  const shared_ptr<CPSG_ReplyItem>& item)
176 {
177  if ( auto tracker = GetTracker(item) ) {
178  tracker->ProcessItemCallback(status, item);
179  }
180 }
181 
182 
184  const shared_ptr<CPSG_Reply>& reply)
185 {
186  if ( auto tracker = GetTracker(reply) ) {
187  tracker->ProcessReplyCallback(status, reply);
188  }
189 }
190 
191 
192 /////////////////////////////////////////////////////////////////////////////
193 // CPSGL_RequestTracker
194 /////////////////////////////////////////////////////////////////////////////
195 
196 
198  const shared_ptr<CPSG_Request>& request,
199  const CRef<CPSGL_Processor>& processor,
200  size_t index)
201  : m_QueueGuard(queue_guard),
202  m_Request(request),
203  m_Processor(processor),
204  m_Index(index),
205  m_Status(CThreadPool_Task::eExecuting),
206  m_ReplyStatus(EPSG_Status::eInProgress),
207  m_NeedsFinalization(false),
208  m_BackgroundTasksSemaphore(0, kMax_UInt),
209  m_BackgroundItemTasks(0),
210  m_Callbacks(0)
211 {
212 }
213 
214 
216 {
218 }
219 
220 
222 {
223 public:
225  EPSG_Status item_status,
226  const shared_ptr<CPSG_ReplyItem>& item)
227  : m_Tracker(tracker),
228  m_ItemStatus(item_status),
229  m_Item(item)
230  {
231  }
232  explicit
233  CBackgroundTask(CPSGL_RequestTracker* tracker) // for reply processing
234  : m_Tracker(tracker),
236  {
237  }
238 
239  static bool s_IsFinished(EStatus status)
240  {
241  return status >= eCompleted;
242  }
243  static bool s_IsAborted(EStatus status)
244  {
245  return status > eCompleted;
246  }
247 
248  EStatus Execute() override
249  {
250  if ( m_Item ) {
252  }
253  else {
255  }
256  }
257 
258  void OnStatusChange(EStatus old_task_status) override
259  {
260  m_Tracker->OnStatusChange(this, old_task_status);
261  }
262 
265  shared_ptr<CPSG_ReplyItem> m_Item;
266 };
267 
268 
270 {
271 public:
272  explicit
274  : m_Tracker(tracker)
275  {
278  m_Tracker = nullptr;
279  }
280  else {
282  }
283  }
284 
286  {
287  if ( !m_Tracker ) {
288  return;
289  }
291  if ( --m_Tracker->m_Callbacks == 0 && m_Tracker->m_BackgroundTasks.empty() ) {
293  }
294  }
295 
296  operator bool() const {
297  return m_Tracker;
298  }
299 
300  CCallbackGuard(const CCallbackGuard&) = delete;
302 
303  void* operator new(size_t) = delete;
304  void* operator new[](size_t) = delete;
305 
306 private:
308 };
309 
310 
312 {
313  MarkAsCanceled();
315  Reset();
316 }
317 
318 
320 {
321  m_Reply.reset();
322  m_Processor.Reset();
323  m_Request.reset();
324 }
325 
326 
328 {
329  TBackgroundTasks tasks;
330  {{
332  tasks = m_BackgroundTasks;
333  }}
334  for ( auto& task : tasks ) {
335  task.GetNCObject().RequestToCancel();
336  }
338 }
339 
340 
342 {
343  for ( ;; ) {
344  {{
346  if ( m_BackgroundTasks.empty() && !m_Callbacks ) {
347  return;
348  }
349  }}
351  }
352 }
353 
354 
355 inline
357 {
358  {{
361  if ( task->m_Item ) {
363  }
364  }}
366 }
367 
368 
370  const shared_ptr<CPSG_ReplyItem>& item)
371 {
372  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::StartProcessItemInBackground()");
373  _ASSERT(!m_Reply);
374  QueueInBackground(Ref(new CBackgroundTask(this, status, item)));
375 }
376 
377 
379 {
380  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::StartProcessReplyInBackground()");
382  _ASSERT(m_Reply);
384 }
385 
386 
388  CThreadPool_Task::EStatus old_task_status)
389 {
390  auto new_task_status = task->GetStatus();
391  if ( CBackgroundTask::s_IsFinished(new_task_status) ) {
392  {{
394  m_BackgroundTasks.erase(Ref(task));
395  }}
396  if ( CBackgroundTask::s_IsAborted(new_task_status) ) {
397  // save error status
398  m_Status = new_task_status;
400  }
401  {{
403  if ( m_BackgroundTasks.empty() && !m_Callbacks ) {
405  }
406  }}
407  }
408 }
409 
410 
412 {
414  {{
416  CThreadPool_Task::EStatus old_status = m_Status;
417  if ( status > old_status ) {
418  m_Status = status;
419  }
420  }}
422 }
423 
424 
425 inline
427 {
429 }
430 
431 
432 inline
434 {
436 }
437 
438 
439 inline
441 {
443  m_NeedsFinalization = true;
445 }
446 
447 
448 inline
450 {
452 }
453 
454 
456  const shared_ptr<CPSG_ReplyItem>& item)
457 {
458  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::ProcessItemCallback()");
459  CCallbackGuard guard(this);
460  if ( !guard ) {
461  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::ProcessItemCallback() - canceled");
462  return;
463  }
464  try {
465  _ASSERT(item);
466  auto result = m_Processor->ProcessItemFast(status, item);
468  // queue background processing
469  StartProcessItemInBackground(status, item);
470  }
471  else if ( result != CPSGL_Processor::eProcessed ) {
472  ERR_POST("CPSGDataLoader: failed processing reply item: "<<result);
473  MarkAsFailed();
474  }
475  }
476  catch ( exception& exc ) {
477  ERR_POST("CPSGDataLoader: exception while processing reply item: "<<exc.what());
478  MarkAsFailed();
479  }
480 }
481 
482 
484  const shared_ptr<CPSG_Reply>& reply)
485 {
486  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::ProcessReplyCallback()");
487  CCallbackGuard guard(this);
488  if ( !guard ) {
489  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::ProcessReplyCallback() - canceled");
490  return;
491  }
492  try {
493  _ASSERT(reply);
494  {{
495  // items may be still being processed in background
496  // and we cannot yet process reply in this case
498  _ASSERT(!m_Reply);
499  m_ReplyStatus = status;
500  m_Reply = reply;
501  if ( m_BackgroundItemTasks > 0 ) {
502  // not all items processed
503  // the reply will be processed by the last item task
504  return;
505  }
506  }}
507  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::ProcessReplyCallback(): ProcessReplyFast()");
508  auto result = m_Processor->ProcessReplyFast(status, reply);
509  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::ProcessReplyCallback(): ProcessReplyFast(): "<<result);
511  // queue processing
513  }
514  else if ( result != CPSGL_Processor::eProcessed ) {
515  ERR_POST("CPSGDataLoader: failed processing reply: "<<result);
516  MarkAsFailed();
517  }
518  else {
519  MarkAsCompleted();
520  }
521  }
522  catch ( exception& exc ) {
523  ERR_POST("CPSGDataLoader: exception while processing reply: "<<exc.what());
524  MarkAsFailed();
525  }
526 }
527 
528 
531  EPSG_Status status,
532  const shared_ptr<CPSG_ReplyItem>& item)
533 {
534  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::BackgroundProcessItemCallback()");
535  try {
536  {{
537  // check if canceled
540  // failed or canceled, no need to process
543  return m_Status;
544  }
545  }}
546  {{
547  // process item
548  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::BackgroundProcessItemCallback(): ProcessItemSlow()");
549  auto result = m_Processor->ProcessItemSlow(status, item);
551  ERR_POST("CPSGDataLoader: failed processing reply item: "<<result);
552  MarkAsFailed();
554  }
555  }}
556  {{
557  // check if result needs to be processed too
560  // failed or canceled, no need to process
563  return m_Status;
564  }
565  if ( --m_BackgroundItemTasks > 0 || !m_Reply ) {
566  // either there are other background item tasks
567  // or reply wasn't received yet
569  }
572  }}
573  {{
574  // process reply, first 'fast' call
575  _ASSERT(m_Reply);
576  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::BackgroundProcessItemCallback(): ProcessReplyFast()");
578  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::BackgroundProcessItemCallback(): ProcessReplyFast(): "<<result);
580  MarkAsCompleted();
582  }
583  else if ( result != CPSGL_Processor::eToNextStage ) {
584  ERR_POST("CPSGDataLoader: failed processing reply: "<<result);
585  MarkAsFailed();
586  }
587  }}
588  {{
589  // process reply, regular call
590  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::BackgroundProcessItemCallback(): ProcessReply()");
592  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::BackgroundProcessItemCallback(): ProcessReplySlow(): "<<result);
594  MarkAsCompleted();
595  }
596  else if ( result != CPSGL_Processor::eToNextStage ) {
597  ERR_POST("CPSGDataLoader: failed processing reply: "<<result);
598  MarkAsFailed();
599  }
600  else {
602  }
604  }}
605  }
606  catch ( exception& exc ) {
607  ERR_POST("CPSGDataLoader: exception while processing reply item: "<<exc.what());
608  MarkAsFailed();
610  }
611 }
612 
613 
616 {
617  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::BackgroundProcessReplyCallback()");
618  try {
619  {{
622  // failed or canceled, no need to process
625  return m_Status;
626  }
628  }}
630  _ASSERT(m_Reply);
633  MarkAsCompleted();
635  }
636  else if ( result != CPSGL_Processor::eToNextStage ) {
637  ERR_POST("CPSGDataLoader: failed processing reply: "<<result);
638  MarkAsFailed();
640  }
641  else {
644  }
645  }
646  catch ( exception& exc ) {
647  ERR_POST("CPSGDataLoader: exception while processing reply: "<<exc.what());
648  MarkAsFailed();
650  }
651 }
652 
653 
655 {
657  if ( m_NeedsFinalization ) {
659  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::FinalizeResult(): calling");
660  try {
662  _TRACE("CPSGL_RequestTracker("<<this<<", "<<m_Processor<<")::FinalizeResult(): finalized: "<<result);
665  }
666  }
667  catch ( exception& exc ) {
668  ERR_POST("CPSGDataLoader: exception while processing reply: "<<exc.what());
670  }
671  }
673  result.m_Processor = std::move(m_Processor);
674  result.m_Index = m_Index;
675  result.m_Status = m_Status;
676  result.m_ReplyStatus = m_ReplyStatus;
677  Reset();
678  return result;
679 }
680 
681 
682 /////////////////////////////////////////////////////////////////////////////
683 // CPSGL_QueueGuard
684 /////////////////////////////////////////////////////////////////////////////
685 
687  CPSGL_Queue& queue)
688  : m_ThreadPool(thread_pool),
689  m_Queue(&queue),
690  m_CompleteSemaphore(0, kMax_UInt)
691 {
692 }
693 
694 
696 {
697  CancelAll();
698 }
699 
700 
702 {
703  while ( auto tracker = GetQueuedRequest() ) {
704  tracker->Cancel();
706  }
707 }
708 
709 
711 {
713  return m_QueuedRequests.empty()? null: *m_QueuedRequests.begin();
714 }
715 
716 
717 #ifdef _DEBUG
719 {
720  switch (type) {
721  case CPSG_Request::eBiodata: return "biodata";
722  case CPSG_Request::eResolve: return "resolve";
723  case CPSG_Request::eBlob: return "blob";
724  case CPSG_Request::eNamedAnnotInfo: return "annot";
725  case CPSG_Request::eChunk: return "chunk";
726  case CPSG_Request::eIpgResolve: return "ipg_resolve";
727  }
728 
729  // Should not happen
730  _TROUBLE;
731  return "unknown";
732 }
733 #endif
734 
735 
736 void CPSGL_QueueGuard::AddRequest(const shared_ptr<CPSG_Request>& request,
737  const CRef<CPSGL_Processor>& processor,
738  size_t index)
739 {
740  CRef<CPSGL_RequestTracker> tracker(new CPSGL_RequestTracker(*this, request, processor, index));
741  _TRACE("CPSGL_QueueGuard::AddRequest(): CPSGL_RequestTracker("<<tracker<<", "<<tracker->m_Processor<<") for requst "<<s_GetRequestTypeName(request->GetType())<<" "<<request->GetId());
742  {{
745  }}
747  if ( !m_Queue->SendRequest(tracker) ) {
748  ERR_POST("CPSGDataLoader: cannot send request");
749  tracker->MarkAsFailed();
750  }
751 }
752 
753 
755 {
756  _TRACE("CPSGL_QueueGuard::MarkAsFinished(): tracker: "<<tracker);
758  {{
760  if ( m_QueuedRequests.erase(tracker) ) {
761  m_CompleteRequests.push_back(tracker);
762  }
764  }}
765 }
766 
767 
769 {
771  for ( ;; ) {
772  {{
774  if ( !m_CompleteRequests.empty() ) {
775  tracker = m_CompleteRequests.front();
776  m_CompleteRequests.pop_front();
777  break;
778  }
779  if ( m_QueuedRequests.empty() ) {
780  break;
781  }
782  }}
784  }
785  if ( tracker ) {
786  _TRACE("CPSGL_QueueGuard::GetNextResult(): tracker: "<<tracker);
787  return tracker->FinalizeResult();
788  }
789  return CPSGL_ResultGuard();
790 }
791 
792 
793 /////////////////////////////////////////////////////////////////////////////
794 // CPSGL_ResultGuard
795 /////////////////////////////////////////////////////////////////////////////
796 
798  : m_Index(0),
799  m_Status(CThreadPool_Task::eIdle),
800  m_ReplyStatus(EPSG_Status::eError)
801 {
802 }
803 
804 
806 {
807 }
808 
809 
812 
813 
817 
818 #endif // HAVE_PSG_LOADER
CDeadline.
Definition: ncbitime.hpp:1830
virtual EProcessResult ProcessItemSlow(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
virtual EProcessResult ProcessReplyFast(EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
virtual EProcessResult ProcessReplySlow(EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
virtual EProcessResult ProcessItemFast(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
virtual EProcessResult ProcessReplyFinal()
CRef< CPSGL_Queue > m_Queue
Definition: psg_evloop.hpp:270
void AddRequest(const shared_ptr< CPSG_Request > &request, const CRef< CPSGL_Processor > &processor, size_t index=0)
Definition: psg_evloop.cpp:736
set< CRef< CPSGL_RequestTracker > > m_QueuedRequests
Definition: psg_evloop.hpp:274
CThreadPool & m_ThreadPool
Definition: psg_evloop.hpp:269
void MarkAsFinished(const CRef< CPSGL_RequestTracker > &request_processor)
Definition: psg_evloop.cpp:754
list< CRef< CPSGL_RequestTracker > > m_CompleteRequests
Definition: psg_evloop.hpp:275
CFastMutex m_CompleteMutex
Definition: psg_evloop.hpp:272
friend class CPSGL_RequestTracker
Definition: psg_evloop.hpp:263
CRef< CPSGL_RequestTracker > GetQueuedRequest()
Definition: psg_evloop.cpp:710
CPSGL_ResultGuard GetNextResult()
Definition: psg_evloop.cpp:768
CSemaphore m_CompleteSemaphore
Definition: psg_evloop.hpp:273
CPSGL_QueueGuard(CThreadPool &thread_pool, CPSGL_Queue &queue)
Definition: psg_evloop.cpp:686
void ProcessItemCallback(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
Definition: psg_evloop.cpp:174
bool SendRequest(const CRef< CPSGL_RequestTracker > &tracker)
Definition: psg_evloop.cpp:164
CRef< CRequestContext > m_RequestContext
Definition: psg_evloop.hpp:92
void SetRequestContext(const CRef< CRequestContext > &context)
Definition: psg_evloop.cpp:126
void DeregisterRequest(const CPSGL_RequestTracker *tracker)
Definition: psg_evloop.cpp:139
void ProcessReplyCallback(EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
Definition: psg_evloop.cpp:183
CPSG_EventLoop m_EventLoop
Definition: psg_evloop.hpp:90
CRef< CPSGL_RequestTracker > GetTracker(const shared_ptr< CPSG_Reply > &reply)
Definition: psg_evloop.cpp:146
unordered_map< const CPSG_Request *, CPSGL_RequestTracker * > m_TrackerMap
Definition: psg_evloop.hpp:94
CFastMutex m_TrackerMapMutex
Definition: psg_evloop.hpp:93
void RegisterRequest(CPSGL_RequestTracker *tracker)
Definition: psg_evloop.cpp:132
thread m_EventLoopThread
Definition: psg_evloop.hpp:91
CPSGL_Queue(const string &service_name)
Definition: psg_evloop.cpp:110
CBackgroundTask(CPSGL_RequestTracker *tracker, EPSG_Status item_status, const shared_ptr< CPSG_ReplyItem > &item)
Definition: psg_evloop.cpp:224
shared_ptr< CPSG_ReplyItem > m_Item
Definition: psg_evloop.cpp:265
CBackgroundTask(CPSGL_RequestTracker *tracker)
Definition: psg_evloop.cpp:233
EStatus Execute() override
Do the actual job.
Definition: psg_evloop.cpp:248
static bool s_IsFinished(EStatus status)
Definition: psg_evloop.cpp:239
CRef< CPSGL_RequestTracker > m_Tracker
Definition: psg_evloop.cpp:263
void OnStatusChange(EStatus old_task_status) override
Callback to notify on changes in the task status.
Definition: psg_evloop.cpp:258
static bool s_IsAborted(EStatus status)
Definition: psg_evloop.cpp:243
CCallbackGuard & operator=(const CCallbackGuard &)=delete
CPSGL_RequestTracker * m_Tracker
Definition: psg_evloop.cpp:307
CCallbackGuard(const CCallbackGuard &)=delete
CCallbackGuard(CPSGL_RequestTracker *tracker)
Definition: psg_evloop.cpp:273
CPSGL_RequestTracker(CPSGL_QueueGuard &queue_guard, const shared_ptr< CPSG_Request > &request, const CRef< CPSGL_Processor > &processor, size_t index=0)
Definition: psg_evloop.cpp:197
void MarkAsNeedsFinalization()
Definition: psg_evloop.cpp:440
CThreadPool_Task::EStatus BackgroundProcessItemCallback(CBackgroundTask *task, EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
Definition: psg_evloop.cpp:530
void StartProcessReplyInBackground()
Definition: psg_evloop.cpp:378
void MarkAsFinished(CThreadPool_Task::EStatus status)
Definition: psg_evloop.cpp:411
TBackgroundTasks m_BackgroundTasks
Definition: psg_evloop.hpp:187
void OnStatusChange(CBackgroundTask *task, CThreadPool_Task::EStatus old_task_status)
Definition: psg_evloop.cpp:387
EPSG_Status m_ReplyStatus
Definition: psg_evloop.hpp:179
CFastMutex m_TrackerMutex
Definition: psg_evloop.hpp:183
CRef< CPSGL_Processor > m_Processor
Definition: psg_evloop.hpp:176
shared_ptr< CPSG_Reply > m_Reply
Definition: psg_evloop.hpp:181
void QueueInBackground(const CRef< CBackgroundTask > &task)
Definition: psg_evloop.cpp:356
CPSGL_QueueGuard & m_QueueGuard
Definition: psg_evloop.hpp:174
void StartProcessItemInBackground(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
Definition: psg_evloop.cpp:369
CSemaphore m_BackgroundTasksSemaphore
Definition: psg_evloop.hpp:185
CThreadPool_Task::EStatus BackgroundProcessReplyCallback(CBackgroundTask *task)
Definition: psg_evloop.cpp:615
shared_ptr< CPSG_Request > m_Request
Definition: psg_evloop.hpp:175
void ProcessItemCallback(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
Definition: psg_evloop.cpp:455
atomic< CThreadPool_Task::EStatus > m_Status
Definition: psg_evloop.hpp:178
CPSGL_ResultGuard FinalizeResult()
Definition: psg_evloop.cpp:654
void ProcessReplyCallback(EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
Definition: psg_evloop.cpp:483
CThreadPool_Task::EStatus GetStatus() const
Definition: psg_evloop.hpp:110
unsigned m_BackgroundItemTasks
Definition: psg_evloop.hpp:188
CPSGL_ResultGuard & operator=(CPSGL_ResultGuard &&)
A class derived from the queue class that additionally allows to run event loop.
bool SendRequest(shared_ptr< CPSG_Request > request, CDeadline deadline)
Push request into the queue.
void Reset()
Stop accepting new requests and cancel all requests whose replies have not been returned yet.
Abstract class for representing single task executing in pool of threads To use this class in applica...
Definition: thread_pool.hpp:76
Main class implementing functionality of pool of threads.
iterator_bool insert(const value_type &val)
Definition: set.hpp:149
const_iterator begin() const
Definition: set.hpp:135
bool empty() const
Definition: set.hpp:133
const_iterator find(const key_type &key) const
Definition: set.hpp:137
void erase(iterator pos)
Definition: set.hpp:151
const_iterator end() const
Definition: set.hpp:136
#define false
Definition: bool.h:36
#define bool
Definition: bool.h:34
#define _TRACE(message)
Definition: ncbidbg.hpp:122
#define _VERIFY(expr)
Definition: ncbidbg.hpp:161
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
TObjectType * GetNCPointer(void) const THROWS_NONE
Get pointer,.
Definition: ncbiobj.hpp:1174
CRef< C > Ref(C *object)
Helper functions to get CRef<> and CConstRef<> objects.
Definition: ncbiobj.hpp:2015
void Reset(void)
Reset reference object.
Definition: ncbiobj.hpp:773
#define kMax_UInt
Definition: ncbi_limits.h:185
EStatus GetStatus(void) const
Get status of the task.
EStatus
Status of the task.
Definition: thread_pool.hpp:79
void AddTask(CThreadPool_Task *task, const CTimeSpan *timeout=NULL)
Add task to the pool for execution.
@ eFailed
failure during execution
Definition: thread_pool.hpp:84
@ eCompleted
executed successfully
Definition: thread_pool.hpp:83
@ eCanceled
canceled - possible only if canceled before processing was started or if method Execute() returns res...
Definition: thread_pool.hpp:85
void Run(void)
Enter the main loop.
void Wait(void)
Wait on semaphore.
Definition: ncbimtx.cpp:1787
void Post(unsigned int count=1)
Increment the semaphore by "count".
Definition: ncbimtx.cpp:1971
@ eInfinite
Infinite deadline.
Definition: ncbitime.hpp:1834
Multi-threading – classes, functions, and features.
@ eIdle
EPSG_Status
Retrieval result.
Definition: psg_client.hpp:626
@ eInProgress
Retrieval is not finalized yet, more info may come.
@ eError
An error was encountered while trying to send request or to read and to process the reply.
BEGIN_NAMESPACE(objects)
static const char * s_GetRequestTypeName(CPSG_Request::EType type)
Definition: psg_evloop.cpp:718
END_NAMESPACE(psgl)
END_NCBI_NAMESPACE
Definition: psg_evloop.cpp:816
BEGIN_NCBI_NAMESPACE
Definition: psg_evloop.cpp:44
Definition: type.c:6
#define _TROUBLE
#define _ASSERT
Pool of generic task-executing threads.
else result
Definition: token2.c:20
static CS_CONTEXT * context
Definition: will_convert.c:21
Modified on Fri Sep 20 14:57:24 2024 by modify_doxy.py rev. 669887