58 static string s_True =
"TRUE\n";
59 static string s_False =
"FALSE\n";
63 "OK: RECEPIENT ADDRESS: " +
75 buffer +=
"OK: EXPLICIT AFFINITIES: n/a (available in VERBOSE mode)\n";
79 buffer +=
"OK: EXPLICIT AFFINITIES: CLIENT NOT FOUND\n";
86 buffer +=
"OK: EXPLICIT AFFINITIES:\n";
89 for ( ; en.
valid(); ++en)
94 buffer +=
"OK: EXPLICIT AFFINITIES: NONE\n";
99 buffer +=
"OK: USE PREFERRED AFFINITIES: ";
113 if (pref_aff.
any()) {
114 buffer +=
"OK: USE PREFERRED AFFINITIES:\n";
117 for ( ; en.
valid(); ++en)
122 buffer +=
"OK: USE PREFERRED AFFINITIES: NONE\n";
125 buffer +=
"OK: USE PREFERRED AFFINITIES: FALSE\n";
128 buffer +=
"OK: EXCLUSIVE NEW AFFINITY: ";
135 buffer +=
"OK: GROUPS: NONE\n";
138 buffer +=
"OK: GROUPS:\n";
140 for ( ; en.
valid(); ++en) {
143 buffer +=
"OK: '" + token +
"'\n";
144 }
catch (
const exception & ex) {
145 ERR_POST(
"Error resolving group number while printing "
146 "the notification registry: " << ex.what());
148 ERR_POST(
"Unknown resolving group number error while "
149 "printing the notification registry");
153 buffer +=
"OK: GROUPS: n/a (available in VERBOSE mode)\n";
169 buffer +=
"OK: HIGH FREQUENCY LIFE TIME: " +
172 buffer +=
"OK: HIGH FREQUENCY LIFE TIME: n/a\n";
174 buffer +=
"OK: SLOW RATE ACTIVE: ";
185 const string & ns_node,
186 const string & qname) :
187 m_JobChangeNotifConstPart(
"ns_node=" + ns_node +
"&job_key="),
191 "reason=get&ns_node=%s&queue=%s",
192 ns_node.c_str(), qname.c_str()) + 1;
194 "reason=read&ns_node=%s&queue=%s",
195 ns_node.c_str(), qname.c_str()) + 1;
199 "NCBI_JSQ_%s", qname.c_str()) + 1;
207 unsigned int timeout,
210 bool exclusive_new_affinity,
215 unsigned int address =
client.GetAddress();
216 list<SNSNotificationAttributes>::iterator found;
225 found->m_ClientNode =
client.GetNode();
226 found->m_WnodeAff = wnode_aff;
227 found->m_AnyJob = any_job;
228 found->m_ExclusiveNewAff = exclusive_new_affinity;
229 found->m_NewFormat = new_format;
230 found->m_Groups = groups;
231 found->m_HifreqNotifyLifetime =
kTimeZero;
232 found->m_SlowRate =
false;
233 found->m_SlowRateCount = 0;
234 found->m_Reason = reason;
240 if (found->m_ClientNode.empty())
241 found->m_AnyJob =
true;
264 attributes.m_ExclusiveNewAff = exclusive_new_affinity;
293 list<SNSNotificationAttributes>::iterator found;
310 const string & notification)
314 notification.size() + 1,
322 const string & job_key,
327 notification.reserve(2048);
338 notification +=
"status";
341 notification +=
"stolen";
344 notification +=
"progress";
347 notification +=
"unknown";
352 size_t encoded_msg_size = url_encoded_msg.size();
353 const size_t size_limit = 768;
355 if (encoded_msg_size > size_limit) {
356 size_t msg_size = progress_msg.size();
357 size_t truncate_count = encoded_msg_size - size_limit;
360 progress_msg.substr(0, msg_size - truncate_count)) +
361 "&msg_truncated=" + to_string(truncate_count);
363 notification +=
"&msg=" + url_encoded_msg;
377 list<SNSNotificationAttributes>::iterator rec;
397 if (cmd_group == rec->m_Reason) {
411 unsigned int notif_lofreq_mult,
423 if (k->m_SlowRate || current_time > k->m_HifreqNotifyLifetime) {
424 k->m_SlowRate =
true;
427 k->m_SlowRateCount += 1;
428 if (k->m_SlowRateCount > notif_lofreq_mult) {
429 k->m_SlowRateCount = 0;
433 k->m_NewFormat, k->m_Reason);
435 k->m_NewFormat, k->m_Reason);
441 k->m_NewFormat, k->m_Reason);
461 if (k->m_ExclusiveNewAff) {
462 candidates = outdated_jobs;
465 if (candidates.
any()) {
467 k->m_NewFormat, k->m_Reason);
470 k->m_HifreqNotifyLifetime = current_time +
471 notif_highfreq_period;
502 Notify(jobs, aff_ids, aff_id == 0,
503 clients_registry, aff_registry, group_registry, scope_registry,
504 notif_highfreq_period, notif_handicap, reason);
530 vector<SNSNotificationAttributes*> targets;
531 bool be_random = (notif_handicap.tv_sec != 0 ||
532 notif_handicap.tv_nsec != 0);
542 if (reason != k->m_Reason) {
548 bool should_send =
false;
555 if (candidates.
any() ==
false) {
561 if (k->m_Groups.any()) {
563 if (candidates.
any() ==
false) {
570 string virtual_scope;
571 clients_registry.
GetScopes(k->m_ClientNode, scope, virtual_scope);
573 if (!virtual_scope.empty()) {
579 scope_registry.
GetJobs(virtual_scope));
582 candidates &= (scope_registry.
GetJobs(scope) |
583 scope_registry.
GetJobs(virtual_scope));
586 if (candidates.
any() ==
false) {
599 if (affinities.
any())
603 k->m_WnodeAff, reason);
605 if (should_send ==
false) {
606 if (k->m_ExclusiveNewAff) {
610 if (affinities.
any())
611 should_send = (affinities -
612 all_preferred_affs).any();
617 k->m_HifreqNotifyLifetime = current_time + notif_highfreq_period;
623 k->m_NewFormat, k->m_Reason);
632 if (be_random && !targets.empty()) {
633 shuffle(targets.begin(), targets.end(), default_random_engine());
637 targets[0]->m_NewFormat,
638 targets[0]->m_Reason);
640 for (
size_t j(1); j < targets.size(); ++j)
644 targets[j]->m_NewFormat,
645 targets[j]->m_Reason);
663 if (k->m_Reason ==
eGet)
665 k->m_NewFormat, k->m_Reason);
673 k->m_NewFormat,
eGet);
691 list<SNSNotificationAttributes>::const_iterator current;
701 buffer += current->Print(clients_registry, aff_registry,
702 group_registry,
true,
verbose);
706 buffer += current->Print(clients_registry, aff_registry,
707 group_registry,
false,
verbose);
715 unsigned int address,
733 notification.
m_Port = port;
747 list<SExactTimeNotification>::iterator k =
750 if ( k->m_Reason ==
eGet)
769 list<SExactTimeNotification>::iterator
first =
771 if (
first->m_TimeToSend > current)
772 return first->m_TimeToSend;
792 for (list<SQueueResumeNotification>::iterator
795 if (k->m_Address == address && k->m_Port == port)
814 for (list<SNSNotificationAttributes>::const_iterator
817 if (k->m_Address == address &&
819 k->m_Reason == cmd_group)
820 return k->m_Lifetime;
828 unsigned int address,
860 list<SNSNotificationAttributes> & container,
861 list<SNSNotificationAttributes>::iterator & record)
863 if (current_time > record->m_Lifetime) {
866 if (!record->m_ClientNode.empty())
870 record->m_Reason,
false);
871 record = container.erase(record);
881 unsigned short port)
const
884 for (list<SExactTimeNotification>::const_iterator
888 if (k->m_Address == address &&
899 unsigned int sec_delay,
900 unsigned int nanosec_delay,
901 const bool & logging) :
902 m_QueueDB(qdb), m_NotifLogging(logging),
903 m_Period(sec_delay, nanosec_delay),
904 m_StopSignal(0, 10000000),
949 }
while (next_exact <= current);
953 delay = next_exact - current;
976 catch (exception & ex) {
978 ERR_POST(
"Error during sending exact time scheduled notifications: "
979 << ex.what() <<
". Notification thread has been stopped.");
983 ERR_POST(
"Unknown error during sending exact time scheduled "
984 "notifications. Notification thread has been stopped.");
1000 .Print(
"_type",
"get_job_notification_thread");
1008 catch (exception & ex) {
1010 ERR_POST(
"Error during notification: " << ex.what() <<
1011 " notification thread has been stopped.");
1013 ctx->SetRequestStatus(
1018 ERR_POST(
"Unknown error during notification. "
1019 "Notification thread has been stopped.");
1021 ctx->SetRequestStatus(
1034 list<SNSNotificationAttributes>::iterator
1036 list<SNSNotificationAttributes> & container,
1037 unsigned int address,
1038 unsigned short port,
1041 for (list<SNSNotificationAttributes>::iterator k = container.begin();
1042 k != container.end(); ++k) {
1043 if (k->m_Address == address &&
1044 k->m_Port == port &&
1045 k->m_Reason == cmd_group)
1048 return container.end();
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
CNSPreciseTime m_NextScheduled
const bool & m_NotifLogging
CQueueDataBase & m_QueueDB
CNSPreciseTime x_ProcessExactTimeNotifications(void)
CGetJobNotificationThread(CQueueDataBase &qdb, unsigned int sec_delay, unsigned int nanosec_delay, const bool &logging)
~CGetJobNotificationThread()
size_t GetLastEventIndex(void) const
const string & GetProgressMsg() const
string GetTokenByID(unsigned int aff_id) const
void SubtractBlacklistedJobs(const CNSClientId &client, ECommandGroup cmd_group, TNSBitVector &bv) const
TNSBitVector GetAllPreferredAffinities(ECommandGroup cmd_group) const
bool CancelWaiting(CNSClient &client, ECommandGroup cmd_group, bool touch_notif_registry=true)
TNSBitVector GetWaitAffinities(const CNSClientId &client, ECommandGroup cmd_group) const
void GetScopes(const string &client_node, string &scope, string &virtual_scope)
bool IsRequestedAffinity(const string &name, const TNSBitVector &aff, bool use_preferred, ECommandGroup cmd_group) const
TNSBitVector GetPreferredAffinities(const CNSClientId &client, ECommandGroup cmd_group) const
unsigned int ResolveGroup(const string &group)
void RestrictByGroup(const string &group, TNSBitVector &bv) const
string BuildJobChangedNotification(const CJob &job, const string &job_key, TJobStatus job_status, ENotificationReason reason)
CFastMutex m_GetAndReadNotificationSocketLock
void x_SendNotificationPacket(unsigned int address, unsigned short port, bool new_format, ECommandGroup reason)
list< SExactTimeNotification > m_ExactTimeNotifications
char m_GetMsgBufferObsoleteVersion[k_MessageBufferSize]
CDatagramSocket m_GetAndReadNotificationSocket
void onQueueResumed(bool any_pending)
CNSPreciseTime GetPassiveNotificationLifetime(unsigned int address, unsigned short port, ECommandGroup cmd_group) const
CMutex m_ExactTimeNotifLock
CMutex m_QueueResumeNotifLock
char m_ReadMsgBuffer[k_MessageBufferSize]
void Notify(unsigned int job_id, unsigned int aff_id, CNSClientsRegistry &clients_registry, CNSAffinityRegistry &aff_registry, CNSGroupsRegistry &group_registry, CNSScopeRegistry &scope_registry, const CNSPreciseTime ¬if_highfreq_period, const CNSPreciseTime ¬if_handicap, ECommandGroup cmd_group)
void AddToQueueResumedNotifications(unsigned int address, unsigned short port, bool new_format)
list< SNSNotificationAttributes > m_PassiveListeners
void x_AddToExactNotifications(unsigned int address, unsigned short port, const CNSPreciseTime &when, bool new_format, ECommandGroup reason)
void NotifyJobChanges(unsigned int address, unsigned short port, const string ¬ification)
void UnregisterListener(const CNSClientId &client, unsigned short port, ECommandGroup cmd_group)
void CheckOutdatedJobs(const TNSBitVector &outdated_jobs, CNSClientsRegistry &clients_registry, const CNSPreciseTime ¬if_highfreq_period, ECommandGroup cmd_group)
void ClearExactGetNotifications(void)
list< SQueueResumeNotification > m_QueueResumeNotifications
CQueueDataBase & m_QueueDB
bool x_TestTimeout(const CNSPreciseTime ¤t_time, CNSClientsRegistry &clients_registry, list< SNSNotificationAttributes > &container, list< SNSNotificationAttributes >::iterator &record)
void CheckTimeout(const CNSPreciseTime ¤t_time, CNSClientsRegistry &clients_registry, ECommandGroup cmd_group)
void NotifyPeriodically(const CNSPreciseTime ¤t_time, unsigned int notif_lofreq_mult, CNSClientsRegistry &clients_registry)
CFastMutex m_StatusNotificationSocketLock
CDatagramSocket m_StatusNotificationSocket
string m_JobChangeNotifConstPart
list< SNSNotificationAttributes > m_ActiveListeners
char m_GetMsgBuffer[k_MessageBufferSize]
list< SNSNotificationAttributes >::iterator x_FindListener(list< SNSNotificationAttributes > &container, unsigned int address, unsigned short port, ECommandGroup cmd_group)
size_t m_GetMsgLengthObsoleteVersion
void RegisterListener(const CNSClientId &client, unsigned short port, unsigned int timeout, bool wnode_aff, bool any_job, bool exclusive_new_affinity, bool new_format, const TNSBitVector &groups, ECommandGroup cmd_group)
string Print(const CNSClientsRegistry &clients_registry, const CNSAffinityRegistry &aff_registry, const CNSGroupsRegistry &group_registry, bool verbose) const
CNSPreciseTime NotifyExactListeners(void)
CNSNotificationList(CQueueDataBase &qdb, const string &ns_node, const string &qname)
bool x_IsInExactList(unsigned int address, unsigned short port) const
static CNSPreciseTime Never(void)
static CNSPreciseTime Current(void)
TNSBitVector GetAllJobsInScopes(void) const
TNSBitVector GetJobs(const string &scope) const
@ eStatus_OK
Command is ok and execution is good.
@ eStatus_ServerError
Internal server error.
void WakeupNotifThread(void)
CNSPreciseTime SendExactNotifications(void)
void NotifyListeners(void)
Constant iterator designed to enumerate "ON" bits.
bool valid() const noexcept
Checks if iterator is still valid.
Bitvector Bit-vector container with runtime compression of bits.
bool any() const noexcept
Returns true if any bits in this bitset are set, and otherwise returns false.
bool set_bit(size_type n, bool val=true)
Sets bit n.
enumerator first() const
Returns enumerator pointing on the first non-zero bit.
static DLIST_TYPE *DLIST_NAME() first(DLIST_LIST_TYPE *list)
static const struct attribute attributes[]
void PrintRequestStop(void)
Print request stop message (for request-driven applications)
CDiagContext & GetDiagContext(void)
Get diag context instance.
static void SetRequestContext(CRequestContext *ctx)
Shortcut to CDiagContextThreadData::GetThreadData().SetRequestContext()
void PrintRequestStart(const string &message)
Print request start message (for request-driven applications)
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
EJobStatus
Job status codes.
static string StatusToString(EJobStatus status)
Printable status type.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
static string ntoa(unsigned int host)
BSD-like API. NB: when int, "host" must be in network byte order.
EIO_Status Send(const void *data, size_t datalen, const string &host=string(), unsigned short port=0)
static string gethostbyaddr(unsigned int host, ESwitch log=eOff)
Return empty string on error.
static string URLEncode(const CTempString str, EUrlEncode flag=eUrlEnc_SkipMarkChars)
URL-encode string.
unsigned short m_Port
TCP port to listen on.
static void SetCurrentThreadName(const CTempString &)
Set name for the current thread.
bool TryWait(unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Timed wait.
void Post(unsigned int count=1)
Increment the semaphore by "count".
const size_t k_MessageBufferSize
const CNSPreciseTime kTimeZero
const CNSPreciseTime kTimeNever
string NS_FormatPreciseTime(const CNSPreciseTime &t)
const string kNoScopeOnly
@ eProgressMessageChanged
Defines CRequestContext class for NCBI C++ diagnostic API.
static CNamedPipeClient * client
CNSPreciseTime m_TimeToSend
CNSPreciseTime m_HifreqNotifyLifetime
CNSPreciseTime m_Lifetime
string Print(const CNSClientsRegistry &clients_registry, const CNSAffinityRegistry &aff_registry, const CNSGroupsRegistry &group_registry, bool is_active, bool verbose) const