NCBI C++ ToolKit
ns_clients_registry.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef NETSCHEDULE_CLIENTS_REGISTRY__HPP
2 #define NETSCHEDULE_CLIENTS_REGISTRY__HPP
3 
4 /* $Id: ns_clients_registry.hpp 79141 2017-08-15 13:30:30Z satskyse $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Sergey Satskiy
30  *
31  * File Description:
32  * NetSchedule clients registry
33  *
34  */
35 
36 #include <corelib/ncbimtx.hpp>
37 
38 #include "ns_clients.hpp"
39 
40 #include <map>
41 #include <string>
42 
43 
45 
46 
47 class CQueue;
49 class CJobStatusTracker;
51 
52 
53 // The CNSClientsRegistry serves all the queue clients.
55 {
56  public:
58  void SetRegistries(CNSAffinityRegistry * aff_registry,
59  CNSNotificationList * notif_registry);
60 
61  size_t size(void) const
62  { return m_Clients.size(); }
63 
64  // Called before any command is issued by the client.
65  // The client record is created or updated.
66  void Touch(CNSClientId & client,
67  TNSBitVector & running_jobs,
68  TNSBitVector & reading_jobs,
69  bool & client_was_found,
70  bool & session_was_reset,
71  string & old_session,
72  bool & had_wn_pref_affs,
73  bool & had_reader_pref_affs);
74 
75  // Methods to update the client records.
76  void MarkAsAdmin(const CNSClientId & client);
77  void AddToSubmitted(const CNSClientId & client,
78  size_t count);
79  void RegisterJob(const CNSClientId & client,
80  unsigned int job_id,
81  ECommandGroup cmd_group);
83  unsigned int job_id,
84  ECommandGroup cmd_group);
85  void UnregisterJob(const CNSClientId & client,
86  unsigned int job_id,
87  ECommandGroup cmd_group);
88  void UnregisterJob(unsigned int job_id,
89  ECommandGroup cmd_group);
91  unsigned int job_id,
92  ECommandGroup cmd_group);
93  void MoveJobToBlacklist(unsigned int job_id,
94  ECommandGroup cmd_group);
96  ECommandGroup cmd_group,
97  TNSBitVector & bv) const;
98  void SubtractBlacklistedJobs(const string & client_node,
99  ECommandGroup cmd_group,
100  TNSBitVector & bv) const;
102  ECommandGroup cmd_group,
103  TNSBitVector & bv) const;
104  void AddBlacklistedJobs(const string & client_node,
105  ECommandGroup cmd_group,
106  TNSBitVector & bv) const;
107  string PrintClientsList(const CQueue * queue,
108  size_t batch_size,
109  bool verbose) const;
110  void SetNodeWaiting(const CNSClientId & client,
111  unsigned short port,
112  const TNSBitVector & aff_ids,
113  ECommandGroup cmd_group);
115  ECommandGroup cmd_group) const;
116  TNSBitVector GetPreferredAffinities(const string & node,
117  ECommandGroup cmd_group) const;
118 
121  ECommandGroup cmd_group) const;
122  TNSBitVector GetWaitAffinities(const string & node,
123  ECommandGroup cmd_group) const;
126  const TNSBitVector & aff_to_add,
127  const TNSBitVector & aff_to_del,
128  ECommandGroup cmd_group);
130  unsigned int aff_to_add,
131  unsigned int aff_to_del,
132  ECommandGroup cmd_group);
134  const TNSBitVector & aff_to_set,
135  ECommandGroup cmd_group);
136  bool IsRequestedAffinity(const string & name,
137  const TNSBitVector & aff,
138  bool use_preferred,
139  ECommandGroup cmd_group) const;
140  bool IsPreferredByAny(unsigned int aff_id,
141  ECommandGroup cmd_group) const;
142  string GetNodeName(unsigned int id) const;
143  bool GetAffinityReset(const CNSClientId & client,
144  ECommandGroup cmd_group) const;
145  void StaleNodes(const CNSPreciseTime & current_time,
146  const CNSPreciseTime & wn_timeout,
147  const CNSPreciseTime & reader_timeout,
148  bool is_log);
149  void Purge(const CNSPreciseTime & current_time,
150  const CNSPreciseTime & timeout_worker_node,
151  unsigned int min_worker_nodes,
152  const CNSPreciseTime & timeout_admin,
153  unsigned int min_admins,
154  const CNSPreciseTime & timeout_submitter,
155  unsigned int min_submitters,
156  const CNSPreciseTime & timeout_reader,
157  unsigned int min_readers,
158  const CNSPreciseTime & timeout_unknown,
159  unsigned int min_unknowns,
160  bool is_log);
162  ECommandGroup cmd_group) const;
164  const CNSPreciseTime & blacklist_timeout,
165  const CNSPreciseTime & read_blacklist_timeout)
166  { m_BlacklistTimeout = blacklist_timeout;
167  m_ReadBlacklistTimeout = read_blacklist_timeout; }
169  void SetLastScope(const CNSClientId & client);
170  void AppendType(const CNSClientId & client,
171  unsigned int type_to_append);
173  ECommandGroup cmd_group);
174  int SetClientData(const CNSClientId & client,
175  const string & data, int data_version);
176  bool CancelWaiting(CNSClient & client, ECommandGroup cmd_group,
177  bool touch_notif_registry = true);
178  bool CancelWaiting(const CNSClientId & client,
179  ECommandGroup cmd_group);
180  bool CancelWaiting(const string & name,
181  ECommandGroup cmd_group,
182  bool touch_notif_registry = true);
183  void ClearClient(const CNSClientId & client,
184  TNSBitVector & running_jobs,
185  TNSBitVector & reading_jobs,
186  bool & client_was_found,
187  string & old_session,
188  bool & had_wn_pref_affs,
189  bool & had_reader_pref_affs);
190  void ClearClient(const string & node_name,
191  TNSBitVector & running_jobs,
192  TNSBitVector & reading_jobs,
193  bool & client_was_found,
194  string & old_session,
195  bool & had_wn_pref_affs,
196  bool & had_reader_pref_affs);
198  const string & client_node,
199  bool is_log,
200  ECommandGroup cmd_group);
201  void GetScopes(const string & client_node,
202  string & scope, string & virtual_scope);
203 
204  private:
205  map< string, CNSClient > m_Clients; // All the queue clients
206  // netschedule knows about
207  // ClientNode -> struct {}
208  // e.g. service10:9300 - > {}
209  // Garbage collected worker nodes which had affinities reset
211  // Garbage collected readers which had preferred affinities
213  mutable CMutex m_Lock; // Lock for the map
214 
215  // Client IDs support
216  unsigned int m_LastID;
218  TNSBitVector m_RegisteredClients; // The identifiers
219  // of all the clients
220  // which are currently
221  // in the registry
222  // Union of all affinities assigned to all WNodes
224  // Union of all affinities assigned to all readers
226 
229 
232 
233  unsigned int x_GetNextID(void);
234 
235  string x_PrintSelected(const TNSBitVector & batch,
236  const CQueue * queue,
237  bool verbose) const;
238 
239  void x_BuildAffinities(ECommandGroup cmd_group);
240  void x_ClearClient(const string & node_name,
241  CNSClient & client,
242  TNSBitVector & jobs,
243  bool & had_pref_affs,
244  ECommandGroup cmd_group);
245  bool x_CouldBeStale(const CNSPreciseTime & current_time,
246  const CNSPreciseTime & timeout,
247  const CNSClient & client,
248  ECommandGroup cmd_group);
249 
250  // GC methods
252  const CNSPreciseTime & current_time,
253  const CNSPreciseTime & timeout_worker_node,
254  unsigned int min_worker_nodes,
255  const CNSPreciseTime & timeout_reader,
256  unsigned int min_readers,
257  bool is_log);
258  void x_PurgeAdmins(const CNSPreciseTime & current_time,
259  const CNSPreciseTime & timeout_admin,
260  unsigned int min_admins,
261  bool is_log);
262  void x_PurgeSubmitters(const CNSPreciseTime & current_time,
263  const CNSPreciseTime & timeout_submitter,
264  unsigned int min_submitters,
265  bool is_log);
266  void x_PurgeUnknowns(const CNSPreciseTime & current_time,
267  const CNSPreciseTime & timeout_unknown,
268  unsigned int min_unknowns,
269  bool is_log);
270  void x_PurgeInactiveClients(const CNSPreciseTime & current_time,
271  const CNSPreciseTime & timeout,
272  unsigned int min_clients,
273  unsigned int client_type,
274  bool is_log);
275 };
276 
277 
278 
280 
281 #endif /* NETSCHEDULE_CLIENTS_REGISTRY__HPP */
282 
CFastMutex –.
Definition: ncbimtx.hpp:667
CMutex –.
Definition: ncbimtx.hpp:749
void x_ClearClient(const string &node_name, CNSClient &client, TNSBitVector &jobs, bool &had_pref_affs, ECommandGroup cmd_group)
void AppendType(const CNSClientId &client, unsigned int type_to_append)
bool IsPreferredByAny(unsigned int aff_id, ECommandGroup cmd_group) const
CNSAffinityRegistry * m_AffRegistry
void RegisterJob(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
void x_BuildAffinities(ECommandGroup cmd_group)
void SetPreferredAffinities(const CNSClientId &client, const TNSBitVector &aff_to_set, ECommandGroup cmd_group)
set< string > m_GCReaderClients
CNSPreciseTime m_ReadBlacklistTimeout
bool x_CouldBeStale(const CNSPreciseTime &current_time, const CNSPreciseTime &timeout, const CNSClient &client, ECommandGroup cmd_group)
void SubtractBlacklistedJobs(const CNSClientId &client, ECommandGroup cmd_group, TNSBitVector &bv) const
void Purge(const CNSPreciseTime &current_time, const CNSPreciseTime &timeout_worker_node, unsigned int min_worker_nodes, const CNSPreciseTime &timeout_admin, unsigned int min_admins, const CNSPreciseTime &timeout_submitter, unsigned int min_submitters, const CNSPreciseTime &timeout_reader, unsigned int min_readers, const CNSPreciseTime &timeout_unknown, unsigned int min_unknowns, bool is_log)
set< string > m_GCWNodeClients
void StaleNodes(const CNSPreciseTime &current_time, const CNSPreciseTime &wn_timeout, const CNSPreciseTime &reader_timeout, bool is_log)
TNSBitVector GetRegisteredClients(void) const
string x_PrintSelected(const TNSBitVector &batch, const CQueue *queue, bool verbose) const
void AddToSubmitted(const CNSClientId &client, size_t count)
void x_PurgeWNodesAndReaders(const CNSPreciseTime &current_time, const CNSPreciseTime &timeout_worker_node, unsigned int min_worker_nodes, const CNSPreciseTime &timeout_reader, unsigned int min_readers, bool is_log)
CNSNotificationList * m_NotifRegistry
string PrintClientsList(const CQueue *queue, size_t batch_size, bool verbose) const
void UnregisterJob(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
void ClearOnTimeout(CNSClient &client, const string &client_node, bool is_log, ECommandGroup cmd_group)
TNSBitVector GetAllPreferredAffinities(ECommandGroup cmd_group) const
void x_PurgeInactiveClients(const CNSPreciseTime &current_time, const CNSPreciseTime &timeout, unsigned int min_clients, unsigned int client_type, bool is_log)
void RegisterBlacklistedJob(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
size_t size(void) const
bool CancelWaiting(CNSClient &client, ECommandGroup cmd_group, bool touch_notif_registry=true)
void SetBlacklistTimeouts(const CNSPreciseTime &blacklist_timeout, const CNSPreciseTime &read_blacklist_timeout)
map< string, CNSClient > m_Clients
TNSBitVector GetWaitAffinities(const CNSClientId &client, ECommandGroup cmd_group) const
void GetScopes(const string &client_node, string &scope, string &virtual_scope)
bool IsRequestedAffinity(const string &name, const TNSBitVector &aff, bool use_preferred, ECommandGroup cmd_group) const
void MoveJobToBlacklist(const CNSClientId &client, unsigned int job_id, ECommandGroup cmd_group)
void ClearClient(const CNSClientId &client, TNSBitVector &running_jobs, TNSBitVector &reading_jobs, bool &client_was_found, string &old_session, bool &had_wn_pref_affs, bool &had_reader_pref_affs)
string GetNodeName(unsigned int id) const
CNSPreciseTime m_BlacklistTimeout
void GCBlacklistedJobs(const CJobStatusTracker &tracker, ECommandGroup cmd_group)
void x_PurgeAdmins(const CNSPreciseTime &current_time, const CNSPreciseTime &timeout_admin, unsigned int min_admins, bool is_log)
void UpdatePreferredAffinities(const CNSClientId &client, const TNSBitVector &aff_to_add, const TNSBitVector &aff_to_del, ECommandGroup cmd_group)
void Touch(CNSClientId &client, TNSBitVector &running_jobs, TNSBitVector &reading_jobs, bool &client_was_found, bool &session_was_reset, string &old_session, bool &had_wn_pref_affs, bool &had_reader_pref_affs)
bool GetAffinityReset(const CNSClientId &client, ECommandGroup cmd_group) const
void SetLastScope(const CNSClientId &client)
TNSBitVector m_RegisteredClients
void MarkAsAdmin(const CNSClientId &client)
void x_PurgeSubmitters(const CNSPreciseTime &current_time, const CNSPreciseTime &timeout_submitter, unsigned int min_submitters, bool is_log)
void RegisterSocketWriteError(const CNSClientId &client)
int SetClientData(const CNSClientId &client, const string &data, int data_version)
unsigned int x_GetNextID(void)
void SetRegistries(CNSAffinityRegistry *aff_registry, CNSNotificationList *notif_registry)
void SetNodeWaiting(const CNSClientId &client, unsigned short port, const TNSBitVector &aff_ids, ECommandGroup cmd_group)
TNSBitVector GetPreferredAffinities(const CNSClientId &client, ECommandGroup cmd_group) const
bool WasGarbageCollected(const CNSClientId &client, ECommandGroup cmd_group) const
void x_PurgeUnknowns(const CNSPreciseTime &current_time, const CNSPreciseTime &timeout_unknown, unsigned int min_unknowns, bool is_log)
void AddBlacklistedJobs(const CNSClientId &client, ECommandGroup cmd_group, TNSBitVector &bv) const
Bitvector Bit-vector container with runtime compression of bits.
Definition: bm.h:115
size_type size() const
Definition: map.hpp:148
char data[12]
Definition: iconv.c:80
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
Multi-threading – mutexes; rw-locks; semaphore.
ECommandGroup
Definition: ns_types.hpp:54
true_type verbose
Definition: processing.cpp:890
static CNamedPipeClient * client
Modified on Sat May 18 11:35:20 2024 by modify_doxy.py rev. 669887