NCBI C++ ToolKit
ns_notifications.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: ns_notifications.cpp 93347 2021-04-05 17:19:57Z grichenk $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Anatoliy Kuznetsov, Sergey Satskiy
27  *
28  * File Description: Support for notifying clients which wait for a job
29  * after issuing the WGET command. The support includes
30  * a list of active notifications and a thread which sends
31  * notifications.
32  */
33 
34 #include <ncbi_pch.hpp>
35 #include <corelib/request_ctx.hpp>
36 
37 #include "ns_notifications.hpp"
38 #include "queue_database.hpp"
39 #include "ns_precise_time.hpp"
40 
41 #include "ns_clients.hpp"
42 #include "ns_clients_registry.hpp"
43 #include "ns_affinity.hpp"
44 #include "ns_handler.hpp"
45 #include "ns_group.hpp"
46 #include <random>
47 
49 
50 
52  const CNSClientsRegistry & clients_registry,
53  const CNSAffinityRegistry & aff_registry,
54  const CNSGroupsRegistry & group_registry,
55  bool is_active,
56  bool verbose) const
57 {
58  static string s_True = "TRUE\n";
59  static string s_False = "FALSE\n";
60  string buffer;
61 
62  buffer += "OK:CLIENT: '" + m_ClientNode + "'\n"
63  "OK: RECEPIENT ADDRESS: " +
65  to_string(m_Port) + "\n"
66  "OK: LIFE TIME: " + NS_FormatPreciseTime(m_Lifetime) + "\n";
67 
68  buffer += "OK: ANY JOB: ";
69  if (m_AnyJob)
70  buffer += s_True;
71  else
72  buffer += s_False;
73 
74  if (verbose == false) {
75  buffer += "OK: EXPLICIT AFFINITIES: n/a (available in VERBOSE mode)\n";
76  }
77  else {
78  if (m_ClientNode.empty())
79  buffer += "OK: EXPLICIT AFFINITIES: CLIENT NOT FOUND\n";
80  else {
81  TNSBitVector wait_aff =
82  clients_registry.GetWaitAffinities(m_ClientNode,
83  eGet);
84 
85  if (wait_aff.any()) {
86  buffer += "OK: EXPLICIT AFFINITIES:\n";
87 
88  TNSBitVector::enumerator en(wait_aff.first());
89  for ( ; en.valid(); ++en)
90  buffer += "OK: '" +
91  aff_registry.GetTokenByID(*en) + "'\n";
92  }
93  else
94  buffer += "OK: EXPLICIT AFFINITIES: NONE\n";
95  }
96  }
97 
98  if (verbose == false) {
99  buffer += "OK: USE PREFERRED AFFINITIES: ";
100  if (m_WnodeAff)
101  buffer += s_True;
102  else
103  buffer += s_False;
104  }
105  else {
106  if (m_WnodeAff) {
107 
108  // Need to print resolved preferred affinities
109  TNSBitVector pref_aff =
110  clients_registry.GetPreferredAffinities(m_ClientNode,
111  eGet);
112 
113  if (pref_aff.any()) {
114  buffer += "OK: USE PREFERRED AFFINITIES:\n";
115 
116  TNSBitVector::enumerator en(pref_aff.first());
117  for ( ; en.valid(); ++en)
118  buffer += "OK: '" +
119  aff_registry.GetTokenByID(*en) + "'\n";
120  }
121  else
122  buffer += "OK: USE PREFERRED AFFINITIES: NONE\n";
123  }
124  else
125  buffer += "OK: USE PREFERRED AFFINITIES: FALSE\n";
126  }
127 
128  buffer += "OK: EXCLUSIVE NEW AFFINITY: ";
129  if (m_ExclusiveNewAff)
130  buffer += s_True;
131  else
132  buffer += s_False;
133 
134  if (!m_Groups.any()) {
135  buffer += "OK: GROUPS: NONE\n";
136  } else {
137  if (verbose) {
138  buffer += "OK: GROUPS:\n";
140  for ( ; en.valid(); ++en) {
141  try {
142  string token = group_registry.ResolveGroup(*en);
143  buffer += "OK: '" + token + "'\n";
144  } catch (const exception & ex) {
145  ERR_POST("Error resolving group number while printing "
146  "the notification registry: " << ex.what());
147  } catch (...) {
148  ERR_POST("Unknown resolving group number error while "
149  "printing the notification registry");
150  }
151  }
152  } else
153  buffer += "OK: GROUPS: n/a (available in VERBOSE mode)\n";
154  }
155 
156  buffer += "OK: REASON: ";
157  if (m_Reason == eGet)
158  buffer += "GET\n";
159  else
160  buffer += "READ\n";
161 
162  buffer += "OK: ACTIVE: ";
163  if (is_active)
164  buffer += s_True;
165  else
166  buffer += s_False;
167 
169  buffer += "OK: HIGH FREQUENCY LIFE TIME: " +
171  else
172  buffer += "OK: HIGH FREQUENCY LIFE TIME: n/a\n";
173 
174  buffer += "OK: SLOW RATE ACTIVE: ";
175  if (m_SlowRate)
176  buffer += s_True;
177  else
178  buffer += s_False;
179 
180  return buffer;
181 }
182 
183 
185  const string & ns_node,
186  const string & qname) :
187  m_JobChangeNotifConstPart("ns_node=" + ns_node + "&job_key="),
188  m_QueueDB(qdb)
189 {
191  "reason=get&ns_node=%s&queue=%s",
192  ns_node.c_str(), qname.c_str()) + 1;
194  "reason=read&ns_node=%s&queue=%s",
195  ns_node.c_str(), qname.c_str()) + 1;
199  "NCBI_JSQ_%s", qname.c_str()) + 1;
200 }
201 
202 
203 void
205  const CNSClientId & client,
206  unsigned short port,
207  unsigned int timeout,
208  bool wnode_aff,
209  bool any_job,
210  bool exclusive_new_affinity,
211  bool new_format,
212  const TNSBitVector & groups,
213  ECommandGroup reason)
214 {
215  unsigned int address = client.GetAddress();
216  list<SNSNotificationAttributes>::iterator found;
218 
219 
220  found = x_FindListener(m_PassiveListeners, address, port, reason);
221  if (found != m_PassiveListeners.end()) {
222  // Passive was here
223  found->m_Lifetime = CNSPreciseTime::Current() +
224  CNSPreciseTime(timeout, 0);
225  found->m_ClientNode = client.GetNode();
226  found->m_WnodeAff = wnode_aff;
227  found->m_AnyJob = any_job;
228  found->m_ExclusiveNewAff = exclusive_new_affinity;
229  found->m_NewFormat = new_format;
230  found->m_Groups = groups;
231  found->m_HifreqNotifyLifetime = kTimeZero;
232  found->m_SlowRate = false;
233  found->m_SlowRateCount = 0;
234  found->m_Reason = reason;
235 
236  // If it is an old client => there is no record in the clients
237  // registry, so there is no information stored of what explicit
238  // affinities were provided even so. Therefore, the old clients
239  // should be notified when any job is available.
240  if (found->m_ClientNode.empty())
241  found->m_AnyJob = true;
242 
243  return;
244  }
245 
246  found = x_FindListener(m_ActiveListeners, address, port, reason);
247  if (found != m_ActiveListeners.end()) {
248  // Active was here - remove it from the active list and insert a new
249  // record into the passive list as it would be absolutely new one.
250  m_ActiveListeners.erase(found);
251  }
252 
253 
254  // New record should be inserted
256 
257  attributes.m_Address = address;
258  attributes.m_Port = port;
259  attributes.m_Lifetime = CNSPreciseTime::Current() +
260  CNSPreciseTime(timeout, 0);
261  attributes.m_ClientNode = client.GetNode();
262  attributes.m_WnodeAff = wnode_aff;
263  attributes.m_AnyJob = any_job;
264  attributes.m_ExclusiveNewAff = exclusive_new_affinity;
265  attributes.m_NewFormat = new_format;
266  attributes.m_Groups = groups;
267  attributes.m_HifreqNotifyLifetime = kTimeZero;
268  attributes.m_SlowRate = false;
269  attributes.m_SlowRateCount = 0;
270  attributes.m_Reason = reason;
271 
272  // See the remark above about old clients.
273  if (attributes.m_ClientNode.empty())
274  attributes.m_AnyJob = true;
275 
276  m_PassiveListeners.push_back(attributes);
277 }
278 
279 
281  unsigned short port,
282  ECommandGroup cmd_group)
283 {
284  UnregisterListener(client.GetAddress(), port, cmd_group);
285 }
286 
287 
288 void CNSNotificationList::UnregisterListener(unsigned int address,
289  unsigned short port,
290  ECommandGroup cmd_group)
291 {
293  list<SNSNotificationAttributes>::iterator found;
294 
295  found = x_FindListener(m_PassiveListeners, address, port, cmd_group);
296  if (found != m_PassiveListeners.end()) {
297  m_PassiveListeners.erase(found);
298  return;
299  }
300 
301  found = x_FindListener(m_ActiveListeners, address, port, cmd_group);
302  if (found != m_ActiveListeners.end())
303  m_ActiveListeners.erase(found);
304 }
305 
306 
307 void
309  unsigned short port,
310  const string & notification)
311 {
313  m_StatusNotificationSocket.Send(notification.c_str(),
314  notification.size() + 1,
315  CSocketAPI::ntoa(address), port);
316 }
317 
318 
319 string
321  const CJob & job,
322  const string & job_key,
323  TJobStatus job_status,
324  ENotificationReason reason)
325 {
326  string notification;
327  notification.reserve(2048);
328 
329  // "ns_node=<node>&job_key="
330  notification +=
331  m_JobChangeNotifConstPart + job_key +
332  "&job_status=" + CNetScheduleAPI::StatusToString(job_status) +
333  "&last_event_index=" + to_string(job.GetLastEventIndex()) +
334  "&reason=";
335 
336  switch (reason) {
337  case eStatusChanged:
338  notification += "status";
339  break;
340  case eNotificationStolen:
341  notification += "stolen";
342  break;
344  notification += "progress";
345  break;
346  default:
347  notification += "unknown";
348  };
349 
350  const string & progress_msg = job.GetProgressMsg();
351  string url_encoded_msg = NStr::URLEncode(progress_msg);
352  size_t encoded_msg_size = url_encoded_msg.size();
353  const size_t size_limit = 768;
354 
355  if (encoded_msg_size > size_limit) {
356  size_t msg_size = progress_msg.size();
357  size_t truncate_count = encoded_msg_size - size_limit;
358  notification +=
359  "&msg=" + NStr::URLEncode(
360  progress_msg.substr(0, msg_size - truncate_count)) +
361  "&msg_truncated=" + to_string(truncate_count);
362  } else {
363  notification += "&msg=" + url_encoded_msg;
364  }
365 
366  return notification;
367 }
368 
369 
370 // Checks if a timeout is over and delete those records;
371 // Called from the notification thread when there are no jobs in pending state,
372 // so the notification flag should be reset.
374  CNSClientsRegistry & clients_registry,
375  ECommandGroup cmd_group)
376 {
377  list<SNSNotificationAttributes>::iterator rec;
379 
380  // Passive records could only be deleted
381  rec = m_PassiveListeners.begin();
382  while (rec != m_PassiveListeners.end()) {
383  if (x_TestTimeout(current_time, clients_registry,
384  m_PassiveListeners, rec))
385  continue;
386  ++rec;
387  }
388 
389  // Active records could be deleted or moved to the passive list
390  rec = m_ActiveListeners.begin();
391  while (rec != m_ActiveListeners.end()) {
392  if (x_TestTimeout(current_time, clients_registry,
393  m_ActiveListeners, rec))
394  continue;
395 
396  // Need to move the record to the passive list
397  if (cmd_group == rec->m_Reason) {
399  rec = m_ActiveListeners.erase(rec);
400  } else {
401  ++rec;
402  }
403  }
404 }
405 
406 
407 // Called from a notification thread which notifies worker nodes periodically
408 void
410  const CNSPreciseTime & current_time,
411  unsigned int notif_lofreq_mult,
412  CNSClientsRegistry & clients_registry)
413 {
415  list<SNSNotificationAttributes>::iterator k = m_ActiveListeners.begin();
416 
417  while (k != m_ActiveListeners.end()) {
418  if (x_TestTimeout(current_time, clients_registry,
419  m_ActiveListeners, k))
420  continue;
421 
422  // The record should not be deleted, so send the notification
423  if (k->m_SlowRate || current_time > k->m_HifreqNotifyLifetime) {
424  k->m_SlowRate = true;
425 
426  // We are at slow rate; might need to skip some
427  k->m_SlowRateCount += 1;
428  if (k->m_SlowRateCount > notif_lofreq_mult) {
429  k->m_SlowRateCount = 0;
430  // Send the same packet twice: Denis wanted to increase
431  // the UDP delivery probability
432  x_SendNotificationPacket(k->m_Address, k->m_Port,
433  k->m_NewFormat, k->m_Reason);
434  x_SendNotificationPacket(k->m_Address, k->m_Port,
435  k->m_NewFormat, k->m_Reason);
436  }
437  } else {
438  // We are at fast rate
439  if (x_IsInExactList(k->m_Address, k->m_Port) == false)
440  x_SendNotificationPacket(k->m_Address, k->m_Port,
441  k->m_NewFormat, k->m_Reason);
442  }
443  ++k;
444  }
445 }
446 
447 
448 void
450  const TNSBitVector & outdated_jobs,
451  CNSClientsRegistry & clients_registry,
452  const CNSPreciseTime & notif_highfreq_period,
453  ECommandGroup cmd_group)
454 {
455  TNSBitVector candidates;
456  CNSPreciseTime current_time = CNSPreciseTime::Current();
458  list<SNSNotificationAttributes>::iterator k = m_PassiveListeners.begin();
459 
460  while (k != m_PassiveListeners.end()) {
461  if (k->m_ExclusiveNewAff) {
462  candidates = outdated_jobs;
463  clients_registry.SubtractBlacklistedJobs(k->m_ClientNode, cmd_group,
464  candidates);
465  if (candidates.any()) {
466  x_SendNotificationPacket(k->m_Address, k->m_Port,
467  k->m_NewFormat, k->m_Reason);
468 
469  // Move from passive list to the active one
470  k->m_HifreqNotifyLifetime = current_time +
471  notif_highfreq_period;
473  k = m_PassiveListeners.erase(k);
474  continue;
475  }
476  }
477  ++k;
478  }
479 }
480 
481 
482 // Called when a vacant job for GET (or READ) has appeared:
483 // submit, batch submit, return, timeout, fail (RDRB, PUT, FPUT, CANCEL) etc.
484 void CNSNotificationList::Notify(unsigned int job_id,
485  unsigned int aff_id,
486  CNSClientsRegistry & clients_registry,
487  CNSAffinityRegistry & aff_registry,
488  CNSGroupsRegistry & group_registry,
489  CNSScopeRegistry & scope_registry,
490  const CNSPreciseTime & notif_highfreq_period,
491  const CNSPreciseTime & notif_handicap,
492  ECommandGroup reason)
493 {
494  TNSBitVector aff_ids;
495  TNSBitVector jobs;
496 
497  if (aff_id != 0)
498  aff_ids.set_bit(aff_id);
499 
500  jobs.set_bit(job_id);
501 
502  Notify(jobs, aff_ids, aff_id == 0,
503  clients_registry, aff_registry, group_registry, scope_registry,
504  notif_highfreq_period, notif_handicap, reason);
505 }
506 
507 
508 // Called when a vacant job for GET (or READ) has appeared:
509 // submit, batch submit, return, timeout, fail (RDRB, PUT, FPUT, CANCEL) etc.
510 void
512  const TNSBitVector & affinities,
513  bool no_aff_jobs,
514  CNSClientsRegistry & clients_registry,
515  CNSAffinityRegistry & aff_registry,
516  CNSGroupsRegistry & group_registry,
517  CNSScopeRegistry & scope_registry,
518  const CNSPreciseTime & notif_highfreq_period,
519  const CNSPreciseTime & notif_handicap,
520  ECommandGroup reason)
521 {
522  CNSPreciseTime current_time = CNSPreciseTime::Current();
523  TNSBitVector all_preferred_affs =
524  clients_registry.GetAllPreferredAffinities(reason);
525  TNSBitVector candidates;
526 
527 
528  // Support randomized UDP notifications
529  // See CXX-3662
530  vector<SNSNotificationAttributes*> targets;
531  bool be_random = (notif_handicap.tv_sec != 0 ||
532  notif_handicap.tv_nsec != 0);
533 
535  list<SNSNotificationAttributes>::iterator k = m_PassiveListeners.begin();
536 
537  while (k != m_PassiveListeners.end()) {
538  if (x_TestTimeout(current_time, clients_registry,
539  m_PassiveListeners, k))
540  continue;
541 
542  if (reason != k->m_Reason) {
543  ++k;
544  continue;
545  }
546 
547  // The notification timeout is not over
548  bool should_send = false;
549 
550  // Check if all the jobs are in in its blacklist
551  candidates = jobs;
552  clients_registry.SubtractBlacklistedJobs(k->m_ClientNode, reason,
553  candidates);
554 
555  if (candidates.any() == false) {
556  ++k;
557  continue;
558  }
559 
560  // Check if the group restriction is applicable
561  if (k->m_Groups.any()) {
562  group_registry.RestrictByGroup(k->m_Groups, candidates);
563  if (candidates.any() == false) {
564  ++k;
565  continue;
566  }
567  }
568 
569  string scope;
570  string virtual_scope;
571  clients_registry.GetScopes(k->m_ClientNode, scope, virtual_scope);
572 
573  if (!virtual_scope.empty()) {
574  // The client was found in the client registry so the jobs need to
575  // be restricted
576  if (scope.empty() || scope == kNoScopeOnly) {
577  // no scope and virtual scope are suitable
578  candidates -= (scope_registry.GetAllJobsInScopes() -
579  scope_registry.GetJobs(virtual_scope));
580  } else {
581  // scope and virtual scope are suitable
582  candidates &= (scope_registry.GetJobs(scope) |
583  scope_registry.GetJobs(virtual_scope));
584  }
585 
586  if (candidates.any() == false) {
587  ++k;
588  continue;
589  }
590  }
591 
592 
593  if (k->m_AnyJob) {
594  should_send = true;
595  }
596  else {
597  // Here: the client is new because old clients always have
598  // m_AnyJob set to true
599  if (affinities.any())
600  should_send = clients_registry.IsRequestedAffinity(
601  k->m_ClientNode,
602  affinities,
603  k->m_WnodeAff, reason);
604  }
605  if (should_send == false) {
606  if (k->m_ExclusiveNewAff) {
607  if (no_aff_jobs)
608  should_send = true;
609  else
610  if (affinities.any())
611  should_send = (affinities -
612  all_preferred_affs).any();
613  }
614  }
615 
616  if (should_send) {
617  k->m_HifreqNotifyLifetime = current_time + notif_highfreq_period;
619  if (be_random) {
620  targets.push_back(&(*(m_ActiveListeners.rbegin())));
621  } else {
622  x_SendNotificationPacket(k->m_Address, k->m_Port,
623  k->m_NewFormat, k->m_Reason);
624  }
625  k = m_PassiveListeners.erase(k);
626  continue;
627  }
628 
629  ++k;
630  }
631 
632  if (be_random && !targets.empty()) {
633  shuffle(targets.begin(), targets.end(), default_random_engine());
634 
635  x_SendNotificationPacket(targets[0]->m_Address,
636  targets[0]->m_Port,
637  targets[0]->m_NewFormat,
638  targets[0]->m_Reason);
639  CNSPreciseTime when = CNSPreciseTime::Current() + notif_handicap;
640  for (size_t j(1); j < targets.size(); ++j)
641  x_AddToExactNotifications(targets[j]->m_Address,
642  targets[j]->m_Port,
643  when,
644  targets[j]->m_NewFormat,
645  targets[j]->m_Reason);
646 
647  // This will reschedule when the thread should wake up next time
649  }
650 }
651 
652 
653 // When a queue is resumed the notifications should be sent to
654 // worker nodes. This would allows them to come for a job straight
655 // after the queue is resumed.
657 {
658  if (any_pending) {
660  list<SNSNotificationAttributes>::iterator k = m_PassiveListeners.begin();
661 
662  for ( ; k != m_PassiveListeners.end(); ++k)
663  if (k->m_Reason == eGet)
664  x_SendNotificationPacket(k->m_Address, k->m_Port,
665  k->m_NewFormat, k->m_Reason);
666  }
667 
669  list<SQueueResumeNotification>::iterator k = m_QueueResumeNotifications.begin();
670 
671  for ( ; k != m_QueueResumeNotifications.end(); ++k)
672  x_SendNotificationPacket(k->m_Address, k->m_Port,
673  k->m_NewFormat, eGet);
675 }
676 
677 
678 // If a lock is released during printing, then there is a chance that
679 // the same entry is printed twice - as active and as inactive.
680 // Bearing in mind that the list of worker nodes waiting on notifications
681 // is going to be requested rare and rather manually, and that the printing
682 // is done into a string under the lock, it seems reasonable to avoid
683 // releasing the lock.
684 string
686  const CNSAffinityRegistry & aff_registry,
687  const CNSGroupsRegistry & group_registry,
688  bool verbose) const
689 {
690  string buffer;
691  list<SNSNotificationAttributes>::const_iterator current;
693 
694  // Approximation
695  buffer.reserve((m_ActiveListeners.size() +
696  m_PassiveListeners.size()) * 384);
697 
698 
699  for (current = m_ActiveListeners.begin();
700  current != m_ActiveListeners.end(); ++current)
701  buffer += current->Print(clients_registry, aff_registry,
702  group_registry, true, verbose);
703 
704  for (current = m_PassiveListeners.begin();
705  current != m_PassiveListeners.end(); ++current)
706  buffer += current->Print(clients_registry, aff_registry,
707  group_registry, false, verbose);
708 
709  return buffer;
710 }
711 
712 
713 void
715  unsigned int address,
716  unsigned short port,
717  const CNSPreciseTime & when,
718  bool new_format,
719  ECommandGroup reason)
720 {
721  // The records in the m_ExactTimeNotifications list should be sorted in
722  // accordance to the time when a notification should be sent.
723  // It is nearly always when a record must be added to the end except one
724  // case - if a handicap timeout has been decreased at runtime and there
725  // were records already in the list and the new record does not exceed the
726  // time when a previous record should be sent. It is also beleived that
727  // such circumstances are very rare and even so that would not be very
728  // harmful if a notification is not sent immediately. Therefore the new
729  // records are always added at the end of the list.
730 
731  SExactTimeNotification notification;
732  notification.m_Address = address;
733  notification.m_Port = port;
734  notification.m_TimeToSend = when;
735  notification.m_NewFormat = new_format;
736  notification.m_Reason = reason;
737 
739  m_ExactTimeNotifications.push_back(notification);
740 }
741 
742 
743 void
745 {
747  list<SExactTimeNotification>::iterator k =
748  m_ExactTimeNotifications.begin();
749  while (k != m_ExactTimeNotifications.end()) {
750  if ( k->m_Reason == eGet)
751  k = m_ExactTimeNotifications.erase(k);
752  else
753  ++k;
754  }
755 }
756 
757 
758 // Notifies all those which exact time has reached.
759 // Returns the next exact time to send notification.
762 {
764  if (m_ExactTimeNotifications.empty())
765  return kTimeNever;
766 
768  while (!m_ExactTimeNotifications.empty()) {
769  list<SExactTimeNotification>::iterator first =
770  m_ExactTimeNotifications.begin();
771  if (first->m_TimeToSend > current)
772  return first->m_TimeToSend;
773 
774  // Here: send the notification and delete the record
775  x_SendNotificationPacket(first->m_Address,
776  first->m_Port,
777  first->m_NewFormat,
778  first->m_Reason);
780  }
781 
782  return kTimeNever;
783 }
784 
785 
786 void
788  unsigned short port,
789  bool new_format)
790 {
792  for (list<SQueueResumeNotification>::iterator
793  k = m_QueueResumeNotifications.begin();
794  k != m_QueueResumeNotifications.end(); ++k) {
795  if (k->m_Address == address && k->m_Port == port)
796  return;
797  }
798 
799  // Not found in the existing list. Add it.
801  notif.m_Address = address;
802  notif.m_Port = port;
803  notif.m_NewFormat = new_format;
804  m_QueueResumeNotifications.push_back(notif);
805 }
806 
807 
810  unsigned short port,
811  ECommandGroup cmd_group) const
812 {
814  for (list<SNSNotificationAttributes>::const_iterator
815  k = m_PassiveListeners.begin();
816  k != m_PassiveListeners.end(); ++k) {
817  if (k->m_Address == address &&
818  k->m_Port == port &&
819  k->m_Reason == cmd_group)
820  return k->m_Lifetime;
821  }
822  return kTimeZero;
823 }
824 
825 
826 void
828  unsigned int address,
829  unsigned short port,
830  bool new_format,
831  ECommandGroup reason)
832 {
834 
835  if (new_format) {
836  // Read notifications could only be of a new format
837  if (reason == eGet)
840  CSocketAPI::ntoa(address), port);
841  else
844  CSocketAPI::ntoa(address), port);
845  }
846  else
850  CSocketAPI::ntoa(address), port);
851 }
852 
853 
854 // Tests if a single record if its lifetime is over.
855 // If so, it deletes the record and moves the iterator to the next record.
856 bool
858  const CNSPreciseTime & current_time,
859  CNSClientsRegistry & clients_registry,
860  list<SNSNotificationAttributes> & container,
861  list<SNSNotificationAttributes>::iterator & record)
862 {
863  if (current_time > record->m_Lifetime) {
864  // The record is out of date - remove it
865 
866  if (!record->m_ClientNode.empty())
867  // That was a new client, so we need to clean up the clients
868  // and affinity registry
869  clients_registry.CancelWaiting(record->m_ClientNode,
870  record->m_Reason, false);
871  record = container.erase(record);
872  return true;
873  }
874 
875  return false;
876 }
877 
878 
879 bool
881  unsigned short port) const
882 {
884  for (list<SExactTimeNotification>::const_iterator
885  k(m_ExactTimeNotifications.begin());
886  k != m_ExactTimeNotifications.end();
887  ++k) {
888  if (k->m_Address == address &&
889  k->m_Port == port)
890  return true;
891  }
892  return false;
893 }
894 
895 
896 // Get job notification thread implementation
898  CQueueDataBase & qdb,
899  unsigned int sec_delay,
900  unsigned int nanosec_delay,
901  const bool & logging) :
902  m_QueueDB(qdb), m_NotifLogging(logging),
903  m_Period(sec_delay, nanosec_delay),
904  m_StopSignal(0, 10000000),
905  m_StopFlag(false)
906 {}
907 
908 
910 {}
911 
912 
914 {
915  m_StopFlag = true;
916  m_StopSignal.Post();
917 }
918 
919 
921 {
922  m_StopSignal.Post();
923 }
924 
925 
927 {
928  SetCurrentThreadName("netscheduled_nt");
930 
931  CNSPreciseTime delay = m_Period;
932  CNSPreciseTime current;
933  CNSPreciseTime next_exact;
934 
935  while (1) {
936  m_StopSignal.TryWait(delay.tv_sec, delay.tv_nsec);
937  if (m_StopFlag)
938  break;
939 
941  x_DoJob();
943  }
944 
945  // Sends notifications scheduled for an exact time
946  do {
947  next_exact = x_ProcessExactTimeNotifications();
948  current = CNSPreciseTime::Current();
949  } while (next_exact <= current);
950 
951  // Calculate delay to the next loop
952  if (next_exact < m_NextScheduled)
953  delay = next_exact - current;
954  else {
955  if (current < m_NextScheduled)
956  delay = m_NextScheduled - current;
957  else {
958  // Spin one more time
959  delay.tv_sec = 0;
960  delay.tv_nsec = 0;
961  }
962  }
963 
964  } // while (1)
965 
966  return 0;
967 }
968 
969 
972 {
973  try {
975  }
976  catch (exception & ex) {
977  RequestStop();
978  ERR_POST("Error during sending exact time scheduled notifications: "
979  << ex.what() << ". Notification thread has been stopped.");
980  }
981  catch (...) {
982  RequestStop();
983  ERR_POST("Unknown error during sending exact time scheduled "
984  "notifications. Notification thread has been stopped.");
985  }
986  return CNSPreciseTime::Never();
987 }
988 
989 
991 {
993  bool is_logging = m_NotifLogging;
994 
995  if (is_logging) {
996  ctx.Reset(new CRequestContext());
997  ctx->SetRequestID();
1000  .Print("_type", "get_job_notification_thread");
1001  ctx->SetRequestStatus(CNetScheduleHandler::eStatus_OK);
1002  }
1003 
1004 
1005  try {
1007  }
1008  catch (exception & ex) {
1009  RequestStop();
1010  ERR_POST("Error during notification: " << ex.what() <<
1011  " notification thread has been stopped.");
1012  if (is_logging)
1013  ctx->SetRequestStatus(
1015  }
1016  catch (...) {
1017  RequestStop();
1018  ERR_POST("Unknown error during notification. "
1019  "Notification thread has been stopped.");
1020  if (is_logging)
1021  ctx->SetRequestStatus(
1023  }
1024 
1025  if (is_logging) {
1027  ctx.Reset();
1029  }
1030 
1031 }
1032 
1033 
1034 list<SNSNotificationAttributes>::iterator
1036  list<SNSNotificationAttributes> & container,
1037  unsigned int address,
1038  unsigned short port,
1039  ECommandGroup cmd_group)
1040 {
1041  for (list<SNSNotificationAttributes>::iterator k = container.begin();
1042  k != container.end(); ++k) {
1043  if (k->m_Address == address &&
1044  k->m_Port == port &&
1045  k->m_Reason == cmd_group)
1046  return k;
1047  }
1048  return container.end();
1049 }
1050 
1052 
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
CNSPreciseTime x_ProcessExactTimeNotifications(void)
CGetJobNotificationThread(CQueueDataBase &qdb, unsigned int sec_delay, unsigned int nanosec_delay, const bool &logging)
Definition: job.hpp:183
size_t GetLastEventIndex(void) const
Definition: job.hpp:258
const string & GetProgressMsg() const
Definition: job.hpp:232
string GetTokenByID(unsigned int aff_id) const
void SubtractBlacklistedJobs(const CNSClientId &client, ECommandGroup cmd_group, TNSBitVector &bv) const
TNSBitVector GetAllPreferredAffinities(ECommandGroup cmd_group) const
bool CancelWaiting(CNSClient &client, ECommandGroup cmd_group, bool touch_notif_registry=true)
TNSBitVector GetWaitAffinities(const CNSClientId &client, ECommandGroup cmd_group) const
void GetScopes(const string &client_node, string &scope, string &virtual_scope)
bool IsRequestedAffinity(const string &name, const TNSBitVector &aff, bool use_preferred, ECommandGroup cmd_group) const
TNSBitVector GetPreferredAffinities(const CNSClientId &client, ECommandGroup cmd_group) const
unsigned int ResolveGroup(const string &group)
Definition: ns_group.cpp:180
void RestrictByGroup(const string &group, TNSBitVector &bv) const
Definition: ns_group.cpp:139
string BuildJobChangedNotification(const CJob &job, const string &job_key, TJobStatus job_status, ENotificationReason reason)
CFastMutex m_GetAndReadNotificationSocketLock
void x_SendNotificationPacket(unsigned int address, unsigned short port, bool new_format, ECommandGroup reason)
list< SExactTimeNotification > m_ExactTimeNotifications
char m_GetMsgBufferObsoleteVersion[k_MessageBufferSize]
CDatagramSocket m_GetAndReadNotificationSocket
void onQueueResumed(bool any_pending)
CNSPreciseTime GetPassiveNotificationLifetime(unsigned int address, unsigned short port, ECommandGroup cmd_group) const
char m_ReadMsgBuffer[k_MessageBufferSize]
void Notify(unsigned int job_id, unsigned int aff_id, CNSClientsRegistry &clients_registry, CNSAffinityRegistry &aff_registry, CNSGroupsRegistry &group_registry, CNSScopeRegistry &scope_registry, const CNSPreciseTime &notif_highfreq_period, const CNSPreciseTime &notif_handicap, ECommandGroup cmd_group)
void AddToQueueResumedNotifications(unsigned int address, unsigned short port, bool new_format)
list< SNSNotificationAttributes > m_PassiveListeners
void x_AddToExactNotifications(unsigned int address, unsigned short port, const CNSPreciseTime &when, bool new_format, ECommandGroup reason)
void NotifyJobChanges(unsigned int address, unsigned short port, const string &notification)
void UnregisterListener(const CNSClientId &client, unsigned short port, ECommandGroup cmd_group)
void CheckOutdatedJobs(const TNSBitVector &outdated_jobs, CNSClientsRegistry &clients_registry, const CNSPreciseTime &notif_highfreq_period, ECommandGroup cmd_group)
void ClearExactGetNotifications(void)
list< SQueueResumeNotification > m_QueueResumeNotifications
CQueueDataBase & m_QueueDB
bool x_TestTimeout(const CNSPreciseTime &current_time, CNSClientsRegistry &clients_registry, list< SNSNotificationAttributes > &container, list< SNSNotificationAttributes >::iterator &record)
void CheckTimeout(const CNSPreciseTime &current_time, CNSClientsRegistry &clients_registry, ECommandGroup cmd_group)
void NotifyPeriodically(const CNSPreciseTime &current_time, unsigned int notif_lofreq_mult, CNSClientsRegistry &clients_registry)
CFastMutex m_StatusNotificationSocketLock
CDatagramSocket m_StatusNotificationSocket
list< SNSNotificationAttributes > m_ActiveListeners
char m_GetMsgBuffer[k_MessageBufferSize]
list< SNSNotificationAttributes >::iterator x_FindListener(list< SNSNotificationAttributes > &container, unsigned int address, unsigned short port, ECommandGroup cmd_group)
void RegisterListener(const CNSClientId &client, unsigned short port, unsigned int timeout, bool wnode_aff, bool any_job, bool exclusive_new_affinity, bool new_format, const TNSBitVector &groups, ECommandGroup cmd_group)
string Print(const CNSClientsRegistry &clients_registry, const CNSAffinityRegistry &aff_registry, const CNSGroupsRegistry &group_registry, bool verbose) const
CNSPreciseTime NotifyExactListeners(void)
CNSNotificationList(CQueueDataBase &qdb, const string &ns_node, const string &qname)
bool x_IsInExactList(unsigned int address, unsigned short port) const
static CNSPreciseTime Never(void)
static CNSPreciseTime Current(void)
TNSBitVector GetAllJobsInScopes(void) const
Definition: ns_scope.cpp:86
TNSBitVector GetJobs(const string &scope) const
Definition: ns_scope.cpp:74
@ eStatus_OK
Command is ok and execution is good.
Definition: ns_handler.hpp:97
@ eStatus_ServerError
Internal server error.
Definition: ns_handler.hpp:108
void WakeupNotifThread(void)
CNSPreciseTime SendExactNotifications(void)
void NotifyListeners(void)
Constant iterator designed to enumerate "ON" bits.
Definition: bm.h:603
bool valid() const noexcept
Checks if iterator is still valid.
Definition: bm.h:283
Bitvector Bit-vector container with runtime compression of bits.
Definition: bm.h:115
bool any() const noexcept
Returns true if any bits in this bitset are set, and otherwise returns false.
Definition: bm.h:2451
bool set_bit(size_type n, bool val=true)
Sets bit n.
Definition: bm.h:4227
enumerator first() const
Returns enumerator pointing on the first non-zero bit.
Definition: bm.h:1871
CS_CONTEXT * ctx
Definition: t0006.c:12
#define false
Definition: bool.h:36
static DLIST_TYPE *DLIST_NAME() first(DLIST_LIST_TYPE *list)
Definition: dlist.tmpl.h:46
static const struct attribute attributes[]
Definition: attributes.c:165
#define NULL
Definition: ncbistd.hpp:225
void PrintRequestStop(void)
Print request stop message (for request-driven applications)
Definition: ncbidiag.cpp:2778
CDiagContext & GetDiagContext(void)
Get diag context instance.
Definition: logging.cpp:818
static void SetRequestContext(CRequestContext *ctx)
Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()
Definition: ncbidiag.cpp:1907
void PrintRequestStart(const string &message)
Print request start message (for request-driven applications)
Definition: ncbidiag.cpp:2762
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
EJobStatus
Job status codes.
static string StatusToString(EJobStatus status)
Printable status type.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
static string ntoa(unsigned int host)
BSD-like API. NB: when int, "host" must be in network byte order.
EIO_Status Send(const void *data, size_t datalen, const string &host=string(), unsigned short port=0)
static string gethostbyaddr(unsigned int host, ESwitch log=eOff)
Return empty string on error.
static string URLEncode(const CTempString str, EUrlEncode flag=eUrlEnc_SkipMarkChars)
URL-encode string.
Definition: ncbistr.cpp:6053
unsigned short m_Port
TCP port to listen on.
static void SetCurrentThreadName(const CTempString &)
Set name for the current thread.
Definition: ncbithr.cpp:958
bool TryWait(unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Timed wait.
Definition: ncbimtx.cpp:1844
void Post(unsigned int count=1)
Increment the semaphore by "count".
Definition: ncbimtx.cpp:1971
NetSchedule job groups.
const size_t k_MessageBufferSize
const CNSPreciseTime kTimeZero
const CNSPreciseTime kTimeNever
string NS_FormatPreciseTime(const CNSPreciseTime &t)
const string kNoScopeOnly
Definition: ns_scope.hpp:55
ENotificationReason
Definition: ns_types.hpp:62
@ eProgressMessageChanged
Definition: ns_types.hpp:65
@ eStatusChanged
Definition: ns_types.hpp:63
@ eNotificationStolen
Definition: ns_types.hpp:64
ECommandGroup
Definition: ns_types.hpp:54
@ eGet
Definition: ns_types.hpp:55
static uint8_t * buffer
Definition: pcre2test.c:1016
true_type verbose
Definition: processing.cpp:878
Defines CRequestContext class for NCBI C++ diagnostic API.
static CNamedPipeClient * client
CNSPreciseTime m_HifreqNotifyLifetime
string Print(const CNSClientsRegistry &clients_registry, const CNSAffinityRegistry &aff_registry, const CNSGroupsRegistry &group_registry, bool is_active, bool verbose) const
Modified on Fri Sep 20 14:57:34 2024 by modify_doxy.py rev. 669887