NCBI C++ ToolKit
ns_notifications.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef NS_GET_NOTIFICATIONS__HPP
2 #define NS_GET_NOTIFICATIONS__HPP
3 
4 /* $Id: ns_notifications.hpp 80155 2017-11-13 16:29:35Z satskyse $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Anatoliy Kuznetsov, Sergey Satskiy
30  *
31  * File Description: Support for notifying clients which wait for a job
32  * after issuing the WGET command. The support includes
33  * a list of active notifications and a thread which sends
34  * notifications.
35  */
36 
37 
38 #include <corelib/ncbimtx.hpp>
39 #include <corelib/ncbithr.hpp>
40 
41 #include <list>
42 
43 #include "ns_types.hpp"
44 #include "ns_precise_time.hpp"
45 
46 
48 
49 class CNSClientId;
50 class CNSClientsRegistry;
52 class CNSGroupsRegistry;
53 class CQueueDataBase;
54 class CNSScopeRegistry;
55 class CJob;
56 
57 
58 
60 {
62 
63  unsigned int m_Address;
64  unsigned short m_Port;
66 
67  string m_ClientNode; // Non-empty for the new style clients
68  bool m_WnodeAff; // true if I need to consider the node
69  // preferred affinities.
70  bool m_AnyJob; // true if any job is suitable.
72 
73  bool m_NewFormat; // If true, then a new format of
74  // notifications will be used. New format
75  // is for clients used GET2 command.
76  // Old format is for clients used WGET.
78 
79  // Support for two stage (different frequency) notifications
80  // First stage is frequent (fast) within configured timeout from the moment
81  // when a job is available.
82  // second stage is infrequent (slow) till the end of the notifications.
83 
85  bool m_SlowRate; // true if the client did not come after
86  // fast notifications period.
87  unsigned int m_SlowRateCount;
88 
89  string Print(const CNSClientsRegistry & clients_registry,
90  const CNSAffinityRegistry & aff_registry,
91  const CNSGroupsRegistry & group_registry,
92  bool is_active,
93  bool verbose) const;
94 };
95 
96 
97 // The structure holds information about one time notification
98 // which is going to be sent at exactly specified time
100 {
102  unsigned int m_Address;
103  unsigned short m_Port;
106 };
107 
108 
109 
110 // The structure holds information about notifications which are
111 // triggered by worker nodes requests while the queue was paused.
113 {
114  unsigned int m_Address;
115  unsigned short m_Port;
117 };
118 
119 
120 const size_t k_MessageBufferSize = 512;
121 
123 {
124  public:
126  const string & ns_node,
127  const string & qname);
128 
129  void RegisterListener(const CNSClientId & client,
130  unsigned short port,
131  unsigned int timeout,
132  bool wnode_aff,
133  bool any_job,
134  bool exclusive_new_affinity,
135  bool new_format,
136  const TNSBitVector & groups,
137  ECommandGroup cmd_group);
139  unsigned short port,
140  ECommandGroup cmd_group);
141  void UnregisterListener(unsigned int address,
142  unsigned short port,
143  ECommandGroup cmd_group);
144  void NotifyJobChanges(unsigned int address,
145  unsigned short port,
146  const string & notification);
147 
148  string BuildJobChangedNotification(const CJob & job,
149  const string & job_key,
150  TJobStatus job_status,
151  ENotificationReason reason);
152  void CheckTimeout(const CNSPreciseTime & current_time,
153  CNSClientsRegistry & clients_registry,
154  ECommandGroup cmd_group);
155  void NotifyPeriodically(const CNSPreciseTime & current_time,
156  unsigned int notif_lofreq_mult,
157  CNSClientsRegistry & clients_registry);
158  void CheckOutdatedJobs(const TNSBitVector & outdated_jobs,
159  CNSClientsRegistry & clients_registry,
160  const CNSPreciseTime & notif_highfreq_period,
161  ECommandGroup cmd_group);
162  void Notify(unsigned int job_id,
163  unsigned int aff_id,
164  CNSClientsRegistry & clients_registry,
165  CNSAffinityRegistry & aff_registry,
166  CNSGroupsRegistry & group_registry,
167  CNSScopeRegistry & scope_registry,
168  const CNSPreciseTime & notif_highfreq_period,
169  const CNSPreciseTime & notif_handicap,
170  ECommandGroup cmd_group);
171  void Notify(const TNSBitVector & jobs,
172  const TNSBitVector & affinities,
173  bool no_aff_jobs,
174  CNSClientsRegistry & clients_registry,
175  CNSAffinityRegistry & aff_registry,
176  CNSGroupsRegistry & group_registry,
177  CNSScopeRegistry & scope_registry,
178  const CNSPreciseTime & notif_highfreq_period,
179  const CNSPreciseTime & notif_handicap,
180  ECommandGroup cmd_group);
181  void onQueueResumed(bool any_pending);
182  string Print(const CNSClientsRegistry & clients_registry,
183  const CNSAffinityRegistry & aff_registry,
184  const CNSGroupsRegistry & group_registry,
185  bool verbose) const;
186  size_t size(void) const
187  {
189  return m_PassiveListeners.size() + m_ActiveListeners.size();
190  }
191 
192  void ClearExactGetNotifications(void);
194 
195  void AddToQueueResumedNotifications(unsigned int address,
196  unsigned short port,
197  bool new_format);
199  GetPassiveNotificationLifetime(unsigned int address,
200  unsigned short port,
201  ECommandGroup cmd_group) const;
202 
203  private:
204  void x_AddToExactNotifications(unsigned int address,
205  unsigned short port,
206  const CNSPreciseTime & when,
207  bool new_format,
208  ECommandGroup reason);
209  void x_SendNotificationPacket(unsigned int address,
210  unsigned short port,
211  bool new_format,
212  ECommandGroup reason);
213  bool x_TestTimeout(
214  const CNSPreciseTime & current_time,
215  CNSClientsRegistry & clients_registry,
216  list<SNSNotificationAttributes> & container,
217  list<SNSNotificationAttributes>::iterator & record);
218  bool x_IsInExactList(unsigned int address, unsigned short port) const;
219 
220  private:
221  list<SNSNotificationAttributes> m_PassiveListeners;
222  list<SNSNotificationAttributes> m_ActiveListeners;
224 
225  list<SExactTimeNotification> m_ExactTimeNotifications;
227 
228  list<SQueueResumeNotification> m_QueueResumeNotifications;
230 
231  list<SNSNotificationAttributes>::iterator
232  x_FindListener(list<SNSNotificationAttributes> & container,
233  unsigned int address,
234  unsigned short port,
235  ECommandGroup cmd_group);
236 
245 
249 
251 
252  private:
255 };
256 
257 
258 
259 // Forward declaration for CGetJobNotificationThread
260 class CQueueDataBase;
261 
262 // The thread for notifying listeners which wait for a job
263 // after issuing the WGET command
265 {
266  public:
268  unsigned int sec_delay,
269  unsigned int nanosec_delay,
270  const bool & logging);
272 
273  void RequestStop(void);
274  void WakeUp(void);
275 
276  private:
277  void x_DoJob(void);
279 
280  protected:
281  virtual void * Main(void);
282 
283  private:
285  const bool & m_NotifLogging;
288 
289  private:
291  mutable bool m_StopFlag;
292 
293  private:
297 };
298 
299 
301 
302 #endif
303 
CDatagramSocket::
CFastMutex –.
Definition: ncbimtx.hpp:667
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
CGetJobNotificationThread(const CGetJobNotificationThread &)
CGetJobNotificationThread & operator=(const CGetJobNotificationThread &)
CNSPreciseTime x_ProcessExactTimeNotifications(void)
CGetJobNotificationThread(CQueueDataBase &qdb, unsigned int sec_delay, unsigned int nanosec_delay, const bool &logging)
Definition: job.hpp:183
CMutex –.
Definition: ncbimtx.hpp:749
string BuildJobChangedNotification(const CJob &job, const string &job_key, TJobStatus job_status, ENotificationReason reason)
CFastMutex m_GetAndReadNotificationSocketLock
void x_SendNotificationPacket(unsigned int address, unsigned short port, bool new_format, ECommandGroup reason)
list< SExactTimeNotification > m_ExactTimeNotifications
char m_GetMsgBufferObsoleteVersion[k_MessageBufferSize]
CDatagramSocket m_GetAndReadNotificationSocket
void onQueueResumed(bool any_pending)
CNSPreciseTime GetPassiveNotificationLifetime(unsigned int address, unsigned short port, ECommandGroup cmd_group) const
char m_ReadMsgBuffer[k_MessageBufferSize]
void Notify(unsigned int job_id, unsigned int aff_id, CNSClientsRegistry &clients_registry, CNSAffinityRegistry &aff_registry, CNSGroupsRegistry &group_registry, CNSScopeRegistry &scope_registry, const CNSPreciseTime &notif_highfreq_period, const CNSPreciseTime &notif_handicap, ECommandGroup cmd_group)
void AddToQueueResumedNotifications(unsigned int address, unsigned short port, bool new_format)
list< SNSNotificationAttributes > m_PassiveListeners
void x_AddToExactNotifications(unsigned int address, unsigned short port, const CNSPreciseTime &when, bool new_format, ECommandGroup reason)
void NotifyJobChanges(unsigned int address, unsigned short port, const string &notification)
CNSNotificationList(const CNSNotificationList &)
void UnregisterListener(const CNSClientId &client, unsigned short port, ECommandGroup cmd_group)
void CheckOutdatedJobs(const TNSBitVector &outdated_jobs, CNSClientsRegistry &clients_registry, const CNSPreciseTime &notif_highfreq_period, ECommandGroup cmd_group)
void ClearExactGetNotifications(void)
list< SQueueResumeNotification > m_QueueResumeNotifications
CQueueDataBase & m_QueueDB
bool x_TestTimeout(const CNSPreciseTime &current_time, CNSClientsRegistry &clients_registry, list< SNSNotificationAttributes > &container, list< SNSNotificationAttributes >::iterator &record)
void CheckTimeout(const CNSPreciseTime &current_time, CNSClientsRegistry &clients_registry, ECommandGroup cmd_group)
void NotifyPeriodically(const CNSPreciseTime &current_time, unsigned int notif_lofreq_mult, CNSClientsRegistry &clients_registry)
CFastMutex m_StatusNotificationSocketLock
CDatagramSocket m_StatusNotificationSocket
list< SNSNotificationAttributes > m_ActiveListeners
char m_GetMsgBuffer[k_MessageBufferSize]
list< SNSNotificationAttributes >::iterator x_FindListener(list< SNSNotificationAttributes > &container, unsigned int address, unsigned short port, ECommandGroup cmd_group)
void RegisterListener(const CNSClientId &client, unsigned short port, unsigned int timeout, bool wnode_aff, bool any_job, bool exclusive_new_affinity, bool new_format, const TNSBitVector &groups, ECommandGroup cmd_group)
CNSNotificationList & operator=(const CNSNotificationList &)
string Print(const CNSClientsRegistry &clients_registry, const CNSAffinityRegistry &aff_registry, const CNSGroupsRegistry &group_registry, bool verbose) const
size_t size(void) const
CNSPreciseTime NotifyExactListeners(void)
CNSNotificationList(CQueueDataBase &qdb, const string &ns_node, const string &qname)
bool x_IsInExactList(unsigned int address, unsigned short port) const
CSemaphore –.
Definition: ncbimtx.hpp:1375
Bitvector Bit-vector container with runtime compression of bits.
Definition: bm.h:115
EJobStatus
Job status codes.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
Multi-threading – mutexes; rw-locks; semaphore.
Multi-threading – classes, functions, and features.
const size_t k_MessageBufferSize
ENotificationReason
Definition: ns_types.hpp:62
ECommandGroup
Definition: ns_types.hpp:54
true_type verbose
Definition: processing.cpp:890
static CNamedPipeClient * client
CNSPreciseTime m_HifreqNotifyLifetime
string Print(const CNSClientsRegistry &clients_registry, const CNSAffinityRegistry &aff_registry, const CNSGroupsRegistry &group_registry, bool is_active, bool verbose) const
Modified on Sat Jun 08 14:24:48 2024 by modify_doxy.py rev. 669887