42 #if defined(HAVE_PSG_LOADER)
111 : m_EventLoop(service_name,
112 bind(&
CPSGL_Queue::ProcessItemCallback, this, placeholders::_1, placeholders::_2),
113 bind(&
CPSGL_Queue::ProcessReplyCallback, this, placeholders::_1, placeholders::_2)),
149 auto iter =
m_TrackerMap.find(reply->GetRequest().get());
153 return Ref(iter->second);
166 shared_ptr<CPSG_Request> request =
tracker->GetRequest();
175 const shared_ptr<CPSG_ReplyItem>& item)
178 tracker->ProcessItemCallback(status, item);
184 const shared_ptr<CPSG_Reply>& reply)
187 tracker->ProcessReplyCallback(status, reply);
198 const shared_ptr<CPSG_Request>& request,
201 : m_QueueGuard(queue_guard),
203 m_Processor(processor),
207 m_NeedsFinalization(
false),
208 m_BackgroundTasksSemaphore(0,
kMax_UInt),
209 m_BackgroundItemTasks(0),
226 const shared_ptr<CPSG_ReplyItem>& item)
303 void*
operator new(size_t) =
delete;
304 void*
operator new[](size_t) =
delete;
334 for (
auto& task : tasks ) {
335 task.GetNCObject().RequestToCancel();
361 if ( task->m_Item ) {
370 const shared_ptr<CPSG_ReplyItem>& item)
372 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::StartProcessItemInBackground()");
380 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::StartProcessReplyInBackground()");
390 auto new_task_status = task->
GetStatus();
417 if ( status > old_status ) {
456 const shared_ptr<CPSG_ReplyItem>& item)
458 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::ProcessItemCallback()");
461 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::ProcessItemCallback() - canceled");
472 ERR_POST(
"CPSGDataLoader: failed processing reply item: "<<
result);
476 catch ( exception& exc ) {
477 ERR_POST(
"CPSGDataLoader: exception while processing reply item: "<<exc.what());
484 const shared_ptr<CPSG_Reply>& reply)
486 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::ProcessReplyCallback()");
489 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::ProcessReplyCallback() - canceled");
507 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::ProcessReplyCallback(): ProcessReplyFast()");
509 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::ProcessReplyCallback(): ProcessReplyFast(): "<<
result);
522 catch ( exception& exc ) {
523 ERR_POST(
"CPSGDataLoader: exception while processing reply: "<<exc.what());
532 const shared_ptr<CPSG_ReplyItem>& item)
534 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessItemCallback()");
548 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessItemCallback(): ProcessItemSlow()");
551 ERR_POST(
"CPSGDataLoader: failed processing reply item: "<<
result);
576 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessItemCallback(): ProcessReplyFast()");
578 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessItemCallback(): ProcessReplyFast(): "<<
result);
590 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessItemCallback(): ProcessReply()");
592 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessItemCallback(): ProcessReplySlow(): "<<
result);
606 catch ( exception& exc ) {
607 ERR_POST(
"CPSGDataLoader: exception while processing reply item: "<<exc.what());
617 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::BackgroundProcessReplyCallback()");
646 catch ( exception& exc ) {
647 ERR_POST(
"CPSGDataLoader: exception while processing reply: "<<exc.what());
659 _TRACE(
"CPSGL_RequestTracker("<<
this<<
", "<<
m_Processor<<
")::FinalizeResult(): calling");
667 catch ( exception& exc ) {
668 ERR_POST(
"CPSGDataLoader: exception while processing reply: "<<exc.what());
688 : m_ThreadPool(thread_pool),
748 ERR_POST(
"CPSGDataLoader: cannot send request");
756 _TRACE(
"CPSGL_QueueGuard::MarkAsFinished(): tracker: "<<
tracker);
786 _TRACE(
"CPSGL_QueueGuard::GetNextResult(): tracker: "<<
tracker);
787 return tracker->FinalizeResult();
virtual EProcessResult ProcessItemSlow(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
virtual EProcessResult ProcessReplyFast(EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
virtual EProcessResult ProcessReplySlow(EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
virtual EProcessResult ProcessItemFast(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
virtual EProcessResult ProcessReplyFinal()
CRef< CPSGL_Queue > m_Queue
void AddRequest(const shared_ptr< CPSG_Request > &request, const CRef< CPSGL_Processor > &processor, size_t index=0)
set< CRef< CPSGL_RequestTracker > > m_QueuedRequests
CThreadPool & m_ThreadPool
void MarkAsFinished(const CRef< CPSGL_RequestTracker > &request_processor)
list< CRef< CPSGL_RequestTracker > > m_CompleteRequests
CFastMutex m_CompleteMutex
friend class CPSGL_RequestTracker
CRef< CPSGL_RequestTracker > GetQueuedRequest()
CPSGL_ResultGuard GetNextResult()
CSemaphore m_CompleteSemaphore
CPSGL_QueueGuard(CThreadPool &thread_pool, CPSGL_Queue &queue)
void ProcessItemCallback(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
bool SendRequest(const CRef< CPSGL_RequestTracker > &tracker)
CRef< CRequestContext > m_RequestContext
void SetRequestContext(const CRef< CRequestContext > &context)
void DeregisterRequest(const CPSGL_RequestTracker *tracker)
void ProcessReplyCallback(EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
CPSG_EventLoop m_EventLoop
CRef< CPSGL_RequestTracker > GetTracker(const shared_ptr< CPSG_Reply > &reply)
unordered_map< const CPSG_Request *, CPSGL_RequestTracker * > m_TrackerMap
CFastMutex m_TrackerMapMutex
void RegisterRequest(CPSGL_RequestTracker *tracker)
CPSGL_Queue(const string &service_name)
CBackgroundTask(CPSGL_RequestTracker *tracker, EPSG_Status item_status, const shared_ptr< CPSG_ReplyItem > &item)
shared_ptr< CPSG_ReplyItem > m_Item
CBackgroundTask(CPSGL_RequestTracker *tracker)
EStatus Execute() override
Do the actual job.
static bool s_IsFinished(EStatus status)
CRef< CPSGL_RequestTracker > m_Tracker
void OnStatusChange(EStatus old_task_status) override
Callback to notify on changes in the task status.
static bool s_IsAborted(EStatus status)
CCallbackGuard & operator=(const CCallbackGuard &)=delete
CPSGL_RequestTracker * m_Tracker
CCallbackGuard(const CCallbackGuard &)=delete
CCallbackGuard(CPSGL_RequestTracker *tracker)
CPSGL_RequestTracker(CPSGL_QueueGuard &queue_guard, const shared_ptr< CPSG_Request > &request, const CRef< CPSGL_Processor > &processor, size_t index=0)
void MarkAsNeedsFinalization()
CThreadPool_Task::EStatus BackgroundProcessItemCallback(CBackgroundTask *task, EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
void StartProcessReplyInBackground()
void MarkAsFinished(CThreadPool_Task::EStatus status)
void WaitForBackgroundTasks()
TBackgroundTasks m_BackgroundTasks
void OnStatusChange(CBackgroundTask *task, CThreadPool_Task::EStatus old_task_status)
EPSG_Status m_ReplyStatus
void CancelBackgroundTasks()
CFastMutex m_TrackerMutex
CRef< CPSGL_Processor > m_Processor
shared_ptr< CPSG_Reply > m_Reply
void QueueInBackground(const CRef< CBackgroundTask > &task)
CPSGL_QueueGuard & m_QueueGuard
void StartProcessItemInBackground(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
CSemaphore m_BackgroundTasksSemaphore
CThreadPool_Task::EStatus BackgroundProcessReplyCallback(CBackgroundTask *task)
shared_ptr< CPSG_Request > m_Request
void ProcessItemCallback(EPSG_Status status, const shared_ptr< CPSG_ReplyItem > &item)
atomic< CThreadPool_Task::EStatus > m_Status
CPSGL_ResultGuard FinalizeResult()
void ProcessReplyCallback(EPSG_Status status, const shared_ptr< CPSG_Reply > &reply)
CThreadPool_Task::EStatus GetStatus() const
unsigned m_BackgroundItemTasks
CPSGL_ResultGuard & operator=(CPSGL_ResultGuard &&)
A class derived from the queue class that additionally allows to run event loop.
bool SendRequest(shared_ptr< CPSG_Request > request, CDeadline deadline)
Push request into the queue.
void Reset()
Stop accepting new requests and cancel all requests whose replies have not been returned yet.
Abstract class for representing single task executing in pool of threads To use this class in applica...
Main class implementing functionality of pool of threads.
iterator_bool insert(const value_type &val)
const_iterator begin() const
const_iterator find(const key_type &key) const
const_iterator end() const
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
TObjectType * GetNCPointer(void) const THROWS_NONE
Get pointer,.
CRef< C > Ref(C *object)
Helper functions to get CRef<> and CConstRef<> objects.
void Reset(void)
Reset reference object.
EStatus GetStatus(void) const
Get status of the task.
EStatus
Status of the task.
void AddTask(CThreadPool_Task *task, const CTimeSpan *timeout=NULL)
Add task to the pool for execution.
@ eFailed
failure during execution
@ eCompleted
executed successfully
@ eCanceled
canceled - possible only if canceled before processing was started or if method Execute() returns res...
void Run(void)
Enter the main loop.
void Wait(void)
Wait on semaphore.
void Post(unsigned int count=1)
Increment the semaphore by "count".
@ eInfinite
Infinite deadline.
Multi-threading – classes, functions, and features.
EPSG_Status
Retrieval result.
@ eInProgress
Retrieval is not finalized yet, more info may come.
@ eError
An error was encountered while trying to send request or to read and to process the reply.
static const char * s_GetRequestTypeName(CPSG_Request::EType type)
Pool of generic task-executing threads.
static CS_CONTEXT * context