NCBI C++ ToolKit
ns_clients.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: ns_clients.cpp 91172 2020-09-21 19:07:12Z satskyse $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description:
29  * NetSchedule clients registry and supporting facilities
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
36 #include <connect/ncbi_socket.hpp>
37 #include <corelib/ncbi_limits.hpp>
38 #include <util/checksum.hpp>
39 
40 #include "ns_clients.hpp"
41 #include "ns_queue.hpp"
42 #include "queue_vc.hpp"
43 #include "ns_affinity.hpp"
44 #include "job_status.hpp"
45 #include "ns_server.hpp"
46 
47 
49 
50 
51 const string kVirtualScopePrefix = "WN_SCOPE:::";
52 
53 
54 // The CClientId serves two types of clients:
55 // - old style clients; they have peer address only
56 // - new style clients; they have all three pieces,
57 // address, node id and session id
59  m_Addr(0), m_ClientType(eClaimedNotProvided),
60  m_ControlPort(0), m_PassedChecks(0), m_ID(0)
61 {}
62 
63 
64 
65 // Used instead of a constructor - when the queue is given
66 // during the handshake phase.
67 void CNSClientId::Update(unsigned int peer_addr,
68  const TNSProtoParams & params)
69 {
71 
72  m_Addr = peer_addr;
80  m_ControlPort = 0;
81  m_PassedChecks = 0;
82  m_ID = 0;
83 
84  found = params.find("client_node");
85  if (found != params.end())
86  m_ClientNode = NStr::ParseEscapes(found->second);
87 
88  found = params.find("client_session");
89  if (found != params.end())
90  m_ClientSession = NStr::ParseEscapes(found->second);
91 
92  found = params.find("client_type");
93  if (found != params.end())
94  m_ClientType = x_ConvertToClaimedType(found->second);
95 
96  // It must be that either both client_node and client_session
97  // parameters are provided or none of them
98  if (m_ClientNode.empty() && !m_ClientSession.empty())
99  NCBI_THROW(CNetScheduleException, eAuthenticationError,
100  "client_session is provided but client_node is not");
101  if (!m_ClientNode.empty() && m_ClientSession.empty())
102  NCBI_THROW(CNetScheduleException, eAuthenticationError,
103  "client_node is provided but client_session is not");
104 
105  if (!m_ClientNode.empty())
107  "client_node");
108  if (!m_ClientSession.empty())
110  "client_session");
111 
112  found = params.find("prog");
113  if (found != params.end())
114  m_ProgName = NStr::ParseEscapes(found->second);
115 
116  found = params.find("client");
117  if (found != params.end())
118  m_ClientName = NStr::ParseEscapes(found->second);
119 
120  found = params.find("scope");
121  if (found != params.end())
122  m_Scope = NStr::ParseEscapes(found->second);
123 
124  found = params.find("control_port");
125  if (found != params.end()) {
126  try {
127  m_ControlPort =
128  NStr::StringToNumeric<unsigned short>(found->second);
129  } catch (...) {
130  ERR_POST("Error converting client control port. "
131  "Expected control_port=<unsigned short>, "
132  "received control_port=" << found->second);
133  m_ControlPort = 0;
134  }
135  }
136 
137  found = params.find("client_host");
138  if (found != params.end())
139  m_ClientHost = NStr::ParseEscapes(found->second);
140 }
141 
142 
143 // true if it is a new client identification
144 bool CNSClientId::IsComplete(void) const
145 {
146  // It is gauranteed in the constructor that both
147  // m_ClientNode and m_ClientSession are empty or not empty together.
148  return !m_ClientNode.empty();
149 }
150 
151 
152 // See CXX-5324 for what virtual scope is
154 {
155  if (IsComplete())
157  return "";
158 }
159 
160 
161 // This is for rude clients which provide the first (i.e. authorization)
162 // in the format that cannot be parsed. In this case the whole line
163 // is considered to be the client name.
164 void CNSClientId::SetClientName(const string & client_name)
165 {
166  m_ClientName = client_name;
167 }
168 
169 
171  CNetScheduleServer * server,
172  const string & cmd)
173 {
174  if (cmd_reqs & eNS_Queue) {
175  if ((m_PassedChecks & eNS_Queue) == 0)
176  NCBI_THROW(CNetScheduleException, eInvalidParameter,
177  "Invalid parameter: queue required");
178  }
179 
180  if (IsAdmin())
181  return; // Admin can do everything
182 
183  if (cmd_reqs & eNS_Admin) {
184  if ((m_PassedChecks & eNS_Admin) == 0) {
185  server->RegisterAlert(eAccess, "admin privileges required "
186  "to execute " + cmd);
187  NCBI_THROW(CNetScheduleException, eAccessDenied,
188  "Access denied: admin privileges required");
189  }
190  }
191 
192  if (cmd_reqs & eNS_Submitter) {
193  if ((m_PassedChecks & eNS_Submitter) == 0) {
194  server->RegisterAlert(eAccess, "submitter privileges required "
195  "to execute " + cmd);
196  NCBI_THROW(CNetScheduleException, eAccessDenied,
197  "Access denied: submitter privileges required");
198  }
199  }
200 
201  if (cmd_reqs & eNS_Worker) {
202  if ((m_PassedChecks & eNS_Worker) == 0) {
203  server->RegisterAlert(eAccess, "worker node privileges required "
204  "to execute " + cmd);
205  NCBI_THROW(CNetScheduleException, eAccessDenied,
206  "Access denied: worker node privileges required");
207  }
208  }
209 
210  if (cmd_reqs & eNS_Reader) {
211  if ((m_PassedChecks & eNS_Reader) == 0) {
212  server->RegisterAlert(eAccess, "reader privileges required "
213  "to execute " + cmd);
214  NCBI_THROW(CNetScheduleException, eAccessDenied,
215  "Access denied: reader privileges required");
216  }
217  }
218 
219  if (cmd_reqs & eNS_Program) {
220  if ((m_PassedChecks & eNS_Program) == 0) {
221  server->RegisterAlert(eAccess, "program privileges required "
222  "to execute " + cmd);
223  NCBI_THROW(CNetScheduleException, eAccessDenied,
224  "Access denied: program privileges required");
225  }
226  }
227 }
228 
229 
231 CNSClientId::x_ConvertToClaimedType(const string & claimed_type) const
232 {
233  string ci_claimed_type = claimed_type;
234  NStr::ToLower(ci_claimed_type);
235  if (ci_claimed_type == "submitter")
236  return eClaimedSubmitter;
237  if (ci_claimed_type == "worker node")
238  return eClaimedWorkerNode;
239  if (ci_claimed_type == "reader")
240  return eClaimedReader;
241  if (ci_claimed_type == "admin")
242  return eClaimedAdmin;
243  if (ci_claimed_type == "auto")
244  return eClaimedAutodetect;
245  if (ci_claimed_type == "reset")
246  return eClaimedReset;
247 
248  ERR_POST(Warning <<
249  "Unsupported client_type value at the handshake phase. Supported "
250  "values are: submitter, worker node, reader, admin and auto. "
251  "Received: " << m_ClientType);
252  return eClaimedNotProvided;
253 }
254 
255 
256 // If the client identification - client_node and/or client_session - breaks
257 // the length limit then it should be 'normalized'. Basically it is shortening
258 // the values in a special way embedding the MD5 checksum.
259 // See CXX-2617
260 string CNSClientId::x_NormalizeNodeOrSession(const string & val,
261  const string & key)
262 {
263  if (val.size() < kMaxWorkerNodeIdSize)
264  return val;
265 
266  // The limit is broken. Let's calculate the value MD5
267  CChecksum checksum(CChecksum::eMD5);
268  string checksum_as_string;
269 
270  checksum.AddLine(val);
271  checksum_as_string = checksum.GetHexSum();
272 
273 
274  string normalized;
275  if (checksum_as_string.size() >= kMaxWorkerNodeIdSize - 2) {
276  // Very defensive: if kMaxWorkerNodeIdSize is defined a way too small
277  normalized = checksum_as_string.substr(0, kMaxWorkerNodeIdSize - 1);
278  } else {
279  size_t outer_lenght = ((kMaxWorkerNodeIdSize - 1) - // available
280  2 - // separators
281  checksum_as_string.size()) // md5
282  / 2;
283  normalized = val.substr(0, outer_lenght) + "/" +
284  checksum_as_string + "/" +
285  val.substr(val.size() - outer_lenght, outer_lenght);
286  }
287 
288  ERR_POST("Client identification parameter " << key <<
289  " exceeds the max allowed length of " <<
290  kMaxWorkerNodeIdSize - 1 << " bytes. It will be replaced with " <<
291  normalized);
292  return normalized;
293 }
294 
295 
296 
298  m_NumberOfGiven(0),
299  m_Jobs(bm::BM_GAP),
300  m_BlacklistedJobs(bm::BM_GAP),
301  m_BlacklistTimeout(NULL),
302  m_WaitPort(0),
303  m_AffReset(false),
304  m_PrefAffinities(bm::BM_GAP),
305  m_WaitAffinities(bm::BM_GAP),
306  m_JobsOpCount(0),
307  m_BlacklistedJobsOpCount(0),
308  m_PrefAffinitiesOpCount(0),
309  m_WaitAffinitiesOpCount(0)
310 {}
311 
312 
314  m_NumberOfGiven(0),
315  m_Jobs(bm::BM_GAP),
316  m_BlacklistedJobs(bm::BM_GAP),
317  m_BlacklistTimeout(timeout),
318  m_WaitPort(0),
319  m_AffReset(false),
320  m_PrefAffinities(bm::BM_GAP),
321  m_WaitAffinities(bm::BM_GAP),
322  m_JobsOpCount(0),
323  m_BlacklistedJobsOpCount(0),
324  m_PrefAffinitiesOpCount(0),
325  m_WaitAffinitiesOpCount(0)
326 {}
327 
328 
329 // Checks if jobs should be removed from the node blacklist
331 {
332  if (!m_BlacklistLimits.empty()) {
333  CNSPreciseTime current_time = CNSPreciseTime::Current();
334  vector<unsigned int> to_be_removed;
335  for (const auto & item : m_BlacklistLimits) {
336  if (item.second < current_time)
337  to_be_removed.push_back(item.first);
338  }
339 
340  for (const auto & job_id : to_be_removed) {
341  m_BlacklistedJobs.set_bit(job_id, false);
342  x_BlacklistedOp();
343  m_BlacklistLimits.erase(job_id);
344  }
345  }
346 }
347 
348 
349 // Checks if a single job is in a blacklist and
350 // whether it should be removed from there
351 void SRemoteNodeData::UpdateBlacklist(unsigned int job_id) const
352 {
353  if (m_BlacklistLimits.empty())
354  return;
355 
356  if (!m_BlacklistedJobs.get_bit(job_id))
357  return;
358 
359  CNSPreciseTime current_time = CNSPreciseTime::Current();
360  map<unsigned int,
361  CNSPreciseTime>::iterator found = m_BlacklistLimits.find(job_id);
362  if (found != m_BlacklistLimits.end()) {
363  if (found->second < current_time) {
364  m_BlacklistedJobs.set_bit(job_id, false);
365  x_BlacklistedOp();
366  m_BlacklistLimits.erase(found);
367  }
368  }
369 }
370 
371 
372 string SRemoteNodeData::GetBlacklistLimit(unsigned int job_id) const
373 {
374  map<unsigned int,
375  CNSPreciseTime>::iterator found = m_BlacklistLimits.find(job_id);
376  if (found != m_BlacklistLimits.end())
377  return NS_FormatPreciseTime(found->second);
378  return "0.0";
379 }
380 
381 
382 void SRemoteNodeData::AddToBlacklist(unsigned int job_id,
383  const CNSPreciseTime & last_access)
384 {
385  if (m_BlacklistTimeout == NULL) {
386  ERR_POST("Design error in NetSchedule. "
387  "Blacklist timeout pointer must not be NULL. "
388  "Ignore blacklisting request and continue.");
389  return;
390  }
391 
393  return; // No need to blacklist the job (per configuration)
394 
395  // Here: the job must be blacklisted. So be attentive to overflow.
396  CNSPreciseTime last_time_in_list = last_access + *m_BlacklistTimeout;
397  if (last_time_in_list < last_access)
398  last_time_in_list = kTimeNever; // overflow
399 
400  m_BlacklistLimits[job_id] = last_time_in_list;
401  m_BlacklistedJobs.set_bit(job_id, true);
402  x_BlacklistedOp();
403 }
404 
405 
407 {
408  if (m_PrefAffinities.any()) {
411  return true;
412  }
413  return false;
414 }
415 
416 
417 void SRemoteNodeData::RegisterJob(unsigned int job_id)
418 {
419  m_Jobs.set_bit(job_id);
420  x_JobsOp();
421  ++m_NumberOfGiven;
422  return;
423 }
424 
425 
426 void SRemoteNodeData::UnregisterGivenJob(unsigned int job_id)
427 {
428  m_Jobs.set_bit(job_id, false);
429  x_JobsOp();
430 }
431 
432 
433 // returns true if the modifications have been really made
434 bool SRemoteNodeData::MoveJobToBlacklist(unsigned int job_id)
435 {
436  if (m_Jobs.get_bit(job_id)) {
437  m_Jobs.set_bit(job_id, false);
439  x_JobsOp();
440  return true;
441  }
442  return false;
443 }
444 
445 
447 {
448  if (aff != 0) {
449  if (!m_PrefAffinities.get_bit(aff)) {
452  return true;
453  }
454  }
455  return false;
456 }
457 
458 
459 // Used in the notifications code to test if a notifications should be sent to
460 // the client
461 bool
463  bool use_preferred) const
464 {
465  if ((m_WaitAffinities & aff).any())
466  return true;
467 
468  if (use_preferred)
469  if ((m_PrefAffinities & aff).any())
470  return true;
471 
472  return false;
473 }
474 
475 
476 void
478  const vector<TJobStatus> & match_states)
479 {
480  // Checks if jobs should be removed from a reader blacklist
481  if (m_BlacklistLimits.empty())
482  return;
483 
484  unsigned int min_existed_id = tracker.GetMinJobID();
485  vector<unsigned int> to_be_removed;
486 
487  for (map<unsigned int,
488  CNSPreciseTime>::const_iterator k = m_BlacklistLimits.begin();
489  k != m_BlacklistLimits.end(); ++k) {
490  if (k->first < min_existed_id)
491  to_be_removed.push_back(k->first);
492  else {
493  TJobStatus status = tracker.GetStatus(k->first);
494  for (vector<TJobStatus>::const_iterator j = match_states.begin();
495  j != match_states.end(); ++j) {
496  if (status == *j) {
497  to_be_removed.push_back(k->first);
498  break;
499  }
500  }
501  }
502  }
503 
504  for (vector<unsigned int>::const_iterator m = to_be_removed.begin();
505  m != to_be_removed.end(); ++m) {
506  m_BlacklistedJobs.set_bit(*m, false);
507  x_BlacklistedOp();
509  }
510 }
511 
512 
513 // Handles the following cases:
514 // - GET2/READ with waiting has been interrupted by another GET2/READ
515 // - CWGET/CWREAD received
516 // - GET2/READ wait timeout is over
517 void
519 {
520  if (m_WaitAffinities.any())
522 
523  m_WaitPort = 0;
524 }
525 
526 
528 {
530  m_JobsOpCount = 0;
532  }
533 }
534 
536 {
540  }
541 }
542 
544 {
548  }
549 }
550 
552 {
556  }
557 }
558 
559 
560 // The CNSClient implementation
561 
562 // This constructor is for the map<> container
564  m_State(eActive),
565  m_Type(0),
566  m_ClaimedType(eClaimedNotProvided),
567  m_Addr(0),
568  m_ControlPort(0),
569  m_RegistrationTime(0, 0),
570  m_SessionStartTime(0, 0),
571  m_SessionResetTime(0, 0),
572  m_LastAccess(0, 0),
573  m_Session(),
574  m_ID(0),
575  m_NumberOfSubmitted(0),
576  m_NumberOfSockErrors(0),
577  m_ClientDataVersion(0)
578 {}
579 
580 
582  CNSPreciseTime * blacklist_timeout,
583  CNSPreciseTime * read_blacklist_timeout) :
584  m_State(eActive),
585  m_Type(0),
586  m_ClaimedType(client_id.GetType()),
587  m_Addr(client_id.GetAddress()),
588  m_ControlPort(client_id.GetControlPort()),
589  m_ClientHost(client_id.GetClientHost()),
590  m_RegistrationTime(0, 0),
591  m_SessionStartTime(0, 0),
592  m_SessionResetTime(0, 0),
593  m_LastAccess(0, 0),
594  m_Session(client_id.GetSession()),
595  m_ID(0),
596  m_NumberOfSubmitted(0),
597  m_NumberOfSockErrors(0),
598  m_ClientDataVersion(0),
599  m_WNData(blacklist_timeout),
600  m_ReaderData(read_blacklist_timeout),
601  m_LastScope(client_id.GetScope()),
602  m_ProgName(client_id.GetProgramName())
603 {
604  if (!client_id.IsComplete())
605  NCBI_THROW(CNetScheduleException, eInternalError,
606  "Creating client information object for old style clients");
610 
611  // It does not make any sense at the time of creation to have 'reset'
612  // client type
615 }
616 
617 
618 void CNSClient::RegisterJob(unsigned int job_id, ECommandGroup cmd_group)
619 {
621  if (cmd_group == eGet) {
622  m_Type |= eWorkerNode;
623  m_WNData.RegisterJob(job_id);
624  } else {
625  m_Type |= eReader;
626  m_ReaderData.RegisterJob(job_id);
627  }
628 }
629 
630 
632 {
634  m_Type |= eSubmitter;
635  m_NumberOfSubmitted += count;
636 }
637 
638 
639 void CNSClient::RegisterBlacklistedJob(unsigned int job_id,
640  ECommandGroup cmd_group)
641 {
642  // The type of the client should not be updated here.
643  // This operation is always prepended by GET/READ so the client type is
644  // set anyway.
646  if (cmd_group == eGet)
648  else
650 }
651 
652 
653 // It updates running and reading job vectors
654 // only in case if the client session has been changed
655 void CNSClient::Touch(const CNSClientId & client_id)
656 {
658  m_State = eActive;
659  m_ControlPort = client_id.GetControlPort();
660  m_ClientHost = client_id.GetClientHost();
661  m_Addr = client_id.GetAddress();
662  m_ProgName = client_id.GetProgramName();
663 
664  EClaimedClientType claimed_client_type = client_id.GetType();
665  if (claimed_client_type == eClaimedReset) {
666  m_Type = 0;
668  } else if (claimed_client_type != eClaimedNotProvided)
669  m_ClaimedType = claimed_client_type;
670 
671  // Check the session id
672  if (m_Session == client_id.GetSession())
673  return; // It's still the same session, nothing to check
674 
677  m_Session = client_id.GetSession();
678 
679  // Affinity members are handled in the upper level of Touch()
680  // There is no need to do anything with the blacklisted jobs. If there are
681  // some jobs there they should stay in the list.
682 
683  m_WNData.m_AffReset = false;
684  m_ReaderData.m_AffReset = false;
685 }
686 
687 
688 // Prints the client info
689 string CNSClient::Print(const string & node_name,
690  const CQueue * queue,
691  const CNSAffinityRegistry & aff_registry,
692  const set< string > & wn_gc_clients,
693  const set< string > & read_gc_clients,
694  bool verbose) const
695 {
696  string buffer;
697  buffer.reserve(4096);
698 
699  buffer += "OK:CLIENT: '" + node_name + "'\n"
700  "OK: STATUS: " + x_StateAsString() + "\n"
701  "OK: PREFERRED AFFINITIES RESET: ";
702  if (m_WNData.m_AffReset) buffer += "TRUE\n";
703  else buffer += "FALSE\n";
704 
705  buffer += "OK: READER PREFERRED AFFINITIES RESET: ";
706  if (m_ReaderData.m_AffReset) buffer += "TRUE\n";
707  else buffer += "FALSE\n";
708 
709  if (m_LastAccess == kTimeZero)
710  buffer += "OK: LAST ACCESS: n/a\n";
711  else
712  buffer += "OK: LAST ACCESS: " +
714 
716  buffer += "OK: SESSION START TIME: n/a\n";
717  else
718  buffer += "OK: SESSION START TIME: " +
720 
722  buffer += "OK: REGISTRATION TIME: n/a\n";
723  else
724  buffer += "OK: REGISTRATION TIME: " +
726 
728  buffer += "OK: SESSION RESET TIME: n/a\n";
729  else
730  buffer += "OK: SESSION RESET TIME: " +
732 
733  buffer += "OK: PEER ADDRESS: " + CSocketAPI::gethostbyaddr(m_Addr) + "\n";
734 
735  buffer += "OK: CLIENT HOST: ";
736  if (m_ClientHost.empty())
737  buffer += "n/a\n";
738  else
739  buffer += m_ClientHost + "\n";
740 
741  buffer += "OK: WORKER NODE CONTROL PORT: ";
742  if (m_ControlPort == 0)
743  buffer += "n/a\n";
744  else
745  buffer += to_string(m_ControlPort) + "\n";
746 
747  if (m_Session.empty())
748  buffer += "OK: SESSION: n/a\n";
749  else
750  buffer += "OK: SESSION: '" + m_Session + "'\n";
751 
752  buffer += "OK: TYPE: " + x_TypeAsString() + "\n";
753 
754  buffer += "OK: NUMBER OF SUBMITTED JOBS: " +
755  to_string(m_NumberOfSubmitted) + "\n";
756 
757  // The Print() member is called under a lock in the client registry so it
758  // is safe here to get access to a const reference of the blacklisted jobs
759  // vector
760  const TNSBitVector & wn_blacklist = m_WNData.GetBlacklistedJobsRef();
761  buffer += "OK: NUMBER OF BLACKLISTED JOBS: " +
762  to_string(wn_blacklist.count()) + "\n";
763  if (verbose && wn_blacklist.any()) {
764  buffer += "OK: BLACKLISTED JOBS:\n";
765 
766  TNSBitVector::enumerator en(wn_blacklist.first());
767  for ( ; en.valid(); ++en) {
768  unsigned int job_id = *en;
769  TJobStatus status = queue->GetJobStatus(job_id);
770  buffer += "OK: " + queue->MakeJobKey(job_id) + " " +
771  m_WNData.GetBlacklistLimit(job_id) + " " +
772  CNetScheduleAPI::StatusToString(status) + "\n";
773  }
774  }
775 
776  // The Print() member is called under a lock in the client registry so it
777  // is safe here to get access to a const reference of the blacklisted jobs
778  // vector
779  const TNSBitVector & rd_blacklist = m_ReaderData.GetBlacklistedJobsRef();
780  buffer += "OK: NUMBER OF READ BLACKLISTED JOBS: " +
781  to_string(rd_blacklist.count()) + "\n";
782  if (verbose && rd_blacklist.any()) {
783  buffer += "OK: READ BLACKLISTED JOBS:\n";
784 
785  TNSBitVector::enumerator en(rd_blacklist.first());
786  for ( ; en.valid(); ++en) {
787  unsigned int job_id = *en;
788  TJobStatus status = queue->GetJobStatus(job_id);
789  buffer += "OK: " + queue->MakeJobKey(job_id) + " " +
790  m_ReaderData.GetBlacklistLimit(job_id) + " " +
791  CNetScheduleAPI::StatusToString(status) + "\n";
792  }
793  }
794 
795  buffer += "OK: NUMBER OF RUNNING JOBS: " +
796  to_string(m_WNData.m_Jobs.count()) + "\n";
797  if (verbose && m_WNData.m_Jobs.any()) {
798  buffer += "OK: RUNNING JOBS:\n";
799 
801  for ( ; en.valid(); ++en)
802  buffer += "OK: " + queue->MakeJobKey(*en) + "\n";
803  }
804 
805  buffer += "OK: NUMBER OF JOBS GIVEN FOR EXECUTION: " +
806  to_string(m_WNData.m_NumberOfGiven) + "\n";
807 
808  buffer += "OK: NUMBER OF READING JOBS: " +
809  to_string(m_ReaderData.m_Jobs.count()) + "\n";
810  if (verbose && m_ReaderData.m_Jobs.any()) {
811  buffer += "OK: READING JOBS:\n";
812 
814  for ( ; en.valid(); ++en)
815  buffer += "OK: " + queue->MakeJobKey(*en) + "\n";
816  }
817 
818  buffer += "OK: NUMBER OF JOBS GIVEN FOR READING: " +
819  to_string(m_ReaderData.m_NumberOfGiven) + "\n";
820 
821  buffer += "OK: NUMBER OF PREFERRED AFFINITIES: " +
822  to_string(m_WNData.m_PrefAffinities.count()) + "\n";
824  buffer += "OK: PREFERRED AFFINITIES:\n";
825 
827  for ( ; en.valid(); ++en)
828  buffer += "OK: '" + aff_registry.GetTokenByID(*en) + "'\n";
829  }
830 
831  buffer += "OK: NUMBER OF REQUESTED AFFINITIES: " +
832  to_string(m_WNData.m_WaitAffinities.count()) + "\n";
834  buffer += "OK: REQUESTED AFFINITIES:\n";
835 
837  for ( ; en.valid(); ++en)
838  buffer += "OK: '" + aff_registry.GetTokenByID(*en) + "'\n";
839  }
840 
841  buffer += "OK: NUMBER OF READER PREFERRED AFFINITIES: " +
842  to_string(m_ReaderData.m_PrefAffinities.count()) +
843  "\n";
845  buffer += "OK: READER PREFERRED AFFINITIES:\n";
846 
848  for ( ; en.valid(); ++en)
849  buffer += "OK: '" + aff_registry.GetTokenByID(*en) + "'\n";
850  }
851 
852  buffer += "OK: NUMBER OF READER REQUESTED AFFINITIES: " +
853  to_string(m_ReaderData.m_WaitAffinities.count()) +
854  "\n";
856  buffer += "OK: READER REQUESTED AFFINITIES:\n";
857 
859  for ( ; en.valid(); ++en)
860  buffer += "OK: '" + aff_registry.GetTokenByID(*en) + "'\n";
861  }
862 
863  buffer += "OK: NUMBER OF SOCKET WRITE ERRORS: " +
864  to_string(m_NumberOfSockErrors) + "\n"
865  "OK: DATA: '" + NStr::PrintableString(m_ClientData) + "'\n"
866  "OK: DATA VERSION: " + to_string(m_ClientDataVersion) + "\n";
867 
868  if (wn_gc_clients.find(node_name) == wn_gc_clients.end())
869  buffer += "OK: WN AFFINITIES GARBAGE COLLECTED: FALSE\n";
870  else
871  buffer += "OK: WN AFFINITIES GARBAGE COLLECTED: TRUE\n";
872 
873  if (read_gc_clients.find(node_name) == read_gc_clients.end())
874  buffer += "OK: READER AFFINITIES GARBAGE COLLECTED: FALSE\n";
875  else
876  buffer += "OK: READER AFFINITIES GARBAGE COLLECTED: TRUE\n";
877  buffer += "OK: LAST SCOPE: '" + m_LastScope + "'\n"
878  "OK: PROGRAM NAME: '" +
880 
881  return buffer;
882 }
883 
884 
885 void
887  ECommandGroup cmd_group)
888 {
889  vector<TJobStatus> match_states;
890  match_states.push_back(CNetScheduleAPI::eJobNotFound);
891  match_states.push_back(CNetScheduleAPI::eConfirmed);
892  match_states.push_back(CNetScheduleAPI::eReadFailed);
893 
894  if (cmd_group == eGet) {
895  match_states.push_back(CNetScheduleAPI::eDone);
896  match_states.push_back(CNetScheduleAPI::eFailed);
897  match_states.push_back(CNetScheduleAPI::eCanceled);
898 
899  m_WNData.GCBlacklist(tracker, match_states);
900  } else
901  m_ReaderData.GCBlacklist(tracker, match_states);
902 }
903 
904 
905 int CNSClient::SetClientData(const string & data, int data_version)
906 {
907  if (data_version != -1 && data_version != m_ClientDataVersion)
908  NCBI_THROW(CNetScheduleException, eClientDataVersionMismatch,
909  "client data version does not match");
910  m_ClientData = data;
912  if (m_ClientDataVersion < 0)
914  return m_ClientDataVersion;
915 }
916 
917 
918 // See CXX-5324 for what virtual scope is
919 string CNSClient::GetVirtualScope(const string & client_node) const
920 {
921  return kVirtualScopePrefix + client_node;
922 }
923 
924 
925 string CNSClient::x_TypeAsString(void) const
926 {
927  string result;
928 
929  switch (m_ClaimedType) {
930  case eClaimedSubmitter: return "submitter";
931  case eClaimedWorkerNode: return "worker node";
932  case eClaimedReader: return "reader";
933  case eClaimedAdmin: return "admin";
934 
935  // autodetect or not provided falls to detection by a command group
936  case eClaimedAutodetect:
937  case eClaimedNotProvided:
938  default:
939  break;
940  }
941 
942  if (m_Type & eSubmitter)
943  result = "submitter";
944 
945  if (m_Type & eWorkerNode) {
946  if (!result.empty())
947  result += " | ";
948  result += "worker node";
949  }
950 
951  if (m_Type & eReader) {
952  if (!result.empty())
953  result += " | ";
954  result += "reader";
955  }
956 
957  if (m_Type & eAdmin) {
958  if (!result.empty())
959  result += " | ";
960  result += "admin";
961  }
962 
963  if (result.empty())
964  return "unknown";
965  return result;
966 }
967 
968 
969 string CNSClient::x_StateAsString(void) const
970 {
971  switch (m_State) {
972  case eActive: return "active";
973  case eWNStale: return "worker node stale";
974  case eReaderStale: return "reader stale";
975  case eWNAndReaderStale: return "worker node stale | reader stale";
976  case eQuit: return "quit";
977  default: ;
978  }
979  return "unknown";
980 }
981 
983 
Checksum and hash calculation classes.
CChecksum – Checksum calculator.
Definition: checksum.hpp:302
string GetTokenByID(unsigned int aff_id) const
string m_ClientName
Definition: ns_clients.hpp:143
bool IsAdmin(void) const
Definition: ns_clients.hpp:112
EClaimedClientType GetType(void) const
Definition: ns_clients.hpp:90
string m_ProgName
Definition: ns_clients.hpp:141
string x_NormalizeNodeOrSession(const string &val, const string &key)
Definition: ns_clients.cpp:260
unsigned int m_ID
Definition: ns_clients.hpp:163
string m_ClientSession
Definition: ns_clients.hpp:147
bool IsComplete(void) const
Definition: ns_clients.cpp:144
const string & GetProgramName(void) const
Definition: ns_clients.hpp:98
string m_ClientNode
Definition: ns_clients.hpp:145
void SetClientName(const string &client_name)
Definition: ns_clients.cpp:164
const string & GetSession(void) const
Definition: ns_clients.hpp:88
void CheckAccess(TNSCommandChecks cmd_reqs, CNetScheduleServer *server, const string &cmd)
Definition: ns_clients.cpp:170
unsigned short m_ControlPort
Definition: ns_clients.hpp:150
EClaimedClientType m_ClientType
Definition: ns_clients.hpp:149
unsigned int m_Addr
Definition: ns_clients.hpp:140
string m_ClientHost
Definition: ns_clients.hpp:151
void Update(unsigned int peer_addr, const TNSProtoParams &params)
Definition: ns_clients.cpp:67
string m_Scope
Definition: ns_clients.hpp:165
const string & GetClientHost(void) const
Definition: ns_clients.hpp:96
unsigned int GetAddress(void) const
Definition: ns_clients.hpp:84
EClaimedClientType x_ConvertToClaimedType(const string &claimed_type) const
Definition: ns_clients.cpp:231
TNSCommandChecks m_PassedChecks
Definition: ns_clients.hpp:154
string GetVirtualScope(void) const
Definition: ns_clients.cpp:153
unsigned short GetControlPort(void) const
Definition: ns_clients.hpp:94
size_t m_NumberOfSockErrors
Definition: ns_clients.hpp:549
string Print(const string &node_name, const CQueue *queue, const CNSAffinityRegistry &aff_registry, const set< string > &gc_clients, const set< string > &read_gc_clients, bool verbose) const
Definition: ns_clients.cpp:689
string m_ProgName
Definition: ns_clients.hpp:558
void RegisterSubmittedJobs(size_t count)
Definition: ns_clients.cpp:631
string m_LastScope
Definition: ns_clients.hpp:557
CNSPreciseTime m_SessionStartTime
Definition: ns_clients.hpp:541
int SetClientData(const string &data, int data_version)
Definition: ns_clients.cpp:905
void RegisterJob(unsigned int job_id, ECommandGroup cmd_group)
Definition: ns_clients.cpp:618
CNSPreciseTime m_LastAccess
Definition: ns_clients.hpp:543
EClaimedClientType m_ClaimedType
Definition: ns_clients.hpp:535
void RegisterBlacklistedJob(unsigned int job_id, ECommandGroup cmd_group)
Definition: ns_clients.cpp:639
CNSPreciseTime m_RegistrationTime
Definition: ns_clients.hpp:540
unsigned short m_ControlPort
Definition: ns_clients.hpp:537
string m_Session
Definition: ns_clients.hpp:545
int m_ClientDataVersion
Definition: ns_clients.hpp:552
string m_ClientData
Definition: ns_clients.hpp:551
ENSClientState m_State
Definition: ns_clients.hpp:525
string m_ClientHost
Definition: ns_clients.hpp:538
SRemoteNodeData m_WNData
Definition: ns_clients.hpp:554
string GetVirtualScope(const string &client_node) const
Definition: ns_clients.cpp:919
@ eWNAndReaderStale
Definition: ns_clients.hpp:309
CNSPreciseTime m_SessionResetTime
Definition: ns_clients.hpp:542
void Touch(const CNSClientId &client_id)
Definition: ns_clients.cpp:655
unsigned int m_Addr
Definition: ns_clients.hpp:536
void GCBlacklistedJobs(const CJobStatusTracker &tracker, ECommandGroup cmd_group)
Definition: ns_clients.cpp:886
string x_TypeAsString(void) const
Definition: ns_clients.cpp:925
unsigned int m_Type
Definition: ns_clients.hpp:527
size_t m_NumberOfSubmitted
Definition: ns_clients.hpp:548
SRemoteNodeData m_ReaderData
Definition: ns_clients.hpp:555
string x_StateAsString(void) const
Definition: ns_clients.cpp:969
static CNSPreciseTime Current(void)
NetSchedule internal exception.
NetScheduler threaded server.
Definition: ns_server.hpp:57
void RegisterAlert(EAlertType alert_type, const string &message)
Definition: ns_server.cpp:643
string MakeJobKey(unsigned int job_id) const
Definition: ns_queue.cpp:4046
TJobStatus GetJobStatus(unsigned job_id) const
Definition: ns_queue.cpp:1941
Constant iterator designed to enumerate "ON" bits.
Definition: bm.h:603
bool valid() const noexcept
Checks if iterator is still valid.
Definition: bm.h:283
Bitvector Bit-vector container with runtime compression of bits.
Definition: bm.h:115
@ opt_free_0
Free unused 0 blocks.
Definition: bm.h:135
bool get_bit(size_type n) const noexcept
returns true if bit n is set and false is bit n is 0.
Definition: bm.h:3602
bool any() const noexcept
Returns true if any bits in this bitset are set, and otherwise returns false.
Definition: bm.h:2451
void optimize(bm::word_t *temp_block=0, optmode opt_mode=opt_compress, statistics *stat=0)
Optimize memory bitvector's memory allocation.
Definition: bm.h:3635
bool set_bit(size_type n, bool val=true)
Sets bit n.
Definition: bm.h:4227
enumerator first() const
Returns enumerator pointing on the first non-zero bit.
Definition: bm.h:1871
void clear(const size_type *ids, size_type ids_size, bm::sort_order so=bm::BM_UNKNOWN)
clear list of bits in this bitset
Definition: bm.h:4149
size_type count() const noexcept
population count (count of ON bits)
Definition: bm.h:2401
void erase(iterator pos)
Definition: map.hpp:167
const_iterator begin() const
Definition: map.hpp:151
const_iterator end() const
Definition: map.hpp:152
bool empty() const
Definition: map.hpp:149
const_iterator find(const key_type &key) const
Definition: map.hpp:153
const_iterator find(const key_type &key) const
Definition: set.hpp:137
const_iterator end() const
Definition: set.hpp:136
static CS_COMMAND * cmd
Definition: ct_dynamic.c:26
#define false
Definition: bool.h:36
char data[12]
Definition: iconv.c:80
#define NULL
Definition: ncbistd.hpp:225
string GetHexSum(void) const
Return string with checksum in hexadecimal form.
Definition: checksum.hpp:353
void AddLine(const char *line, size_t len)
Definition: checksum.hpp:609
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
EJobStatus
Job status codes.
static string StatusToString(EJobStatus status)
Printable status type.
@ eDone
Job is ready (computed successfully)
@ eConfirmed
Final state - read confirmed.
@ eCanceled
Explicitly canceled.
@ eJobNotFound
No such job.
@ eReadFailed
Final state - read failed.
@ eFailed
Failed to run (execution timeout)
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
static string gethostbyaddr(unsigned int host, ESwitch log=eOff)
Return empty string on error.
static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)
Get a printable version of the specified string.
Definition: ncbistr.cpp:3953
#define kEmptyStr
Definition: ncbistr.hpp:123
static string ParseEscapes(const CTempString str, EEscSeqRange mode=eEscSeqRange_Standard, char user_char='?')
Parse C-style escape sequences in the specified string.
Definition: ncbistr.cpp:4793
static string & ToLower(string &str)
Convert string to lower case – string& version.
Definition: ncbistr.cpp:405
@ BM_GAP
GAP compression is ON.
Definition: bmconst.h:148
unsigned int
A callback function used to compare two keys in a database.
Definition: types.hpp:1210
NetSchedule job status tracker.
#include<zmmintrin.h>
Definition: bm.h:78
const struct ncbi::grid::netcache::search::fields::KEY key
#define GetProgramName
Avoid name clash with the NCBI C Toolkit.
Definition: ncbienv.hpp:49
NetSchedule client specs.
unsigned int TNSCommandChecks
Definition: ns_access.hpp:50
@ eNS_Submitter
Definition: ns_access.hpp:44
@ eNS_Queue
Definition: ns_access.hpp:42
@ eNS_Worker
Definition: ns_access.hpp:45
@ eNS_Reader
Definition: ns_access.hpp:47
@ eNS_Program
Definition: ns_access.hpp:46
@ eNS_Admin
Definition: ns_access.hpp:43
const size_t k_OpLimitToOptimize
Definition: ns_affinity.hpp:48
@ eAccess
Definition: ns_alert.hpp:52
const string kVirtualScopePrefix
Definition: ns_clients.cpp:51
EClaimedClientType
Definition: ns_clients.hpp:55
@ eClaimedWorkerNode
Definition: ns_clients.hpp:57
@ eClaimedAdmin
Definition: ns_clients.hpp:59
@ eClaimedNotProvided
Definition: ns_clients.hpp:64
@ eClaimedReader
Definition: ns_clients.hpp:58
@ eClaimedReset
Definition: ns_clients.hpp:62
@ eClaimedAutodetect
Definition: ns_clients.hpp:60
@ eClaimedSubmitter
Definition: ns_clients.hpp:56
const CNSPreciseTime kTimeZero
const CNSPreciseTime kTimeNever
string NS_FormatPreciseTime(const CNSPreciseTime &t)
const unsigned kMaxWorkerNodeIdSize
Definition: ns_types.hpp:117
ECommandGroup
Definition: ns_types.hpp:54
@ eGet
Definition: ns_types.hpp:55
static pcre_uint8 * buffer
Definition: pcretest.c:1051
true_type verbose
Definition: processing.cpp:890
NetSchedule queue client version control.
void AddToBlacklist(unsigned int job_id, const CNSPreciseTime &last_access)
Definition: ns_clients.cpp:382
void x_BlacklistedOp(void) const
Definition: ns_clients.cpp:535
TNSBitVector m_WaitAffinities
Definition: ns_clients.hpp:264
TNSBitVector m_Jobs
Definition: ns_clients.hpp:249
map< unsigned int, CNSPreciseTime > m_BlacklistLimits
Definition: ns_clients.hpp:256
size_t m_JobsOpCount
Definition: ns_clients.hpp:267
CNSPreciseTime * m_BlacklistTimeout
Definition: ns_clients.hpp:253
void x_UpdateBlacklist(void) const
Definition: ns_clients.cpp:330
void GCBlacklist(const CJobStatusTracker &tracker, const vector< TJobStatus > &match_states)
Definition: ns_clients.cpp:477
unsigned short m_WaitPort
Definition: ns_clients.hpp:259
size_t m_NumberOfGiven
Definition: ns_clients.hpp:246
size_t m_WaitAffinitiesOpCount
Definition: ns_clients.hpp:270
void CancelWaiting(void)
Definition: ns_clients.cpp:518
TNSBitVector m_BlacklistedJobs
Definition: ns_clients.hpp:252
const TNSBitVector & GetBlacklistedJobsRef(void) const
Definition: ns_clients.hpp:183
TNSBitVector m_PrefAffinities
Definition: ns_clients.hpp:263
void UpdateBlacklist(unsigned int job_id) const
Definition: ns_clients.cpp:351
void x_JobsOp(void) const
Definition: ns_clients.cpp:527
bool ClearPreferredAffinities(void)
Definition: ns_clients.cpp:406
void RegisterJob(unsigned int job_id)
Definition: ns_clients.cpp:417
void x_PrefAffinitiesOp(void) const
Definition: ns_clients.cpp:543
void ClearWaitAffinities(void)
Definition: ns_clients.hpp:226
size_t m_BlacklistedJobsOpCount
Definition: ns_clients.hpp:268
void x_WaitAffinitiesOp(void) const
Definition: ns_clients.cpp:551
bool AddPreferredAffinity(unsigned int aff)
Definition: ns_clients.cpp:446
size_t m_PrefAffinitiesOpCount
Definition: ns_clients.hpp:269
bool MoveJobToBlacklist(unsigned int job_id)
Definition: ns_clients.cpp:434
string GetBlacklistLimit(unsigned int job_id) const
Definition: ns_clients.cpp:372
bool IsRequestedAffinity(const TNSBitVector &aff, bool use_preferred) const
Definition: ns_clients.cpp:462
void UnregisterGivenJob(unsigned int job_id)
Definition: ns_clients.cpp:426
else result
Definition: token2.c:20
CScope & GetScope()
Modified on Thu Jun 13 17:28:27 2024 by modify_doxy.py rev. 669887