NCBI C++ ToolKit
netschedule_api_getjob.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: netschedule_api_getjob.cpp 90047 2020-05-06 16:16:59Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Rafael Sadyrov
27  *
28  * File Description:
29  * NetSchedule API get/read job implementation.
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
35 #include "grid_worker_impl.hpp"
36 #include "netschedule_api_impl.hpp"
37 #include "netservice_api_impl.hpp"
38 
40 
42 
43 
44 template <class TImpl> class CAnyAffinityJob;
45 template <class TImpl> class CMostAffinityJob;
46 
47 template <class TImpl>
49  const CDeadline& deadline,
52  bool any_affinity)
53 {
54  if (any_affinity) {
55  CAnyAffinityJob<TImpl> holder(job, job_status, m_ImmediateActions);
56  return GetJobImpl(deadline, holder);
57  } else {
58  ReturnNotFullyCheckedServers();
59  CMostAffinityJob<TImpl> holder(job, job_status, m_ImmediateActions, m_Impl);
60  return GetJobImpl(deadline, holder);
61  }
62 }
63 
64 typedef list<SSocketAddress> TServers;
65 typedef list<CNetScheduleGetJob::SEntry> TTimeline;
66 typedef TTimeline::iterator TIterator;
67 
68 template <class TImpl>
70 {
71 public:
74 
76  TTimeline& timeline) :
77  job(j), job_status(js), m_Timeline(timeline)
78  {}
79 
80  void Interrupt() {}
81  TIterator Begin() { return m_Timeline.begin(); }
82  TIterator Next(bool) { return m_Timeline.begin(); }
83  const string& Affinity() const { return kEmptyStr; }
84  bool Done() { return true; }
85  bool HasJob() const { return false; }
86 
87 private:
89 };
90 
91 template <class TImpl>
93 {
94 public:
97 
99  TTimeline& timeline, TImpl& get_job_impl) :
100  job(j), job_status(js), m_JobPriority(numeric_limits<size_t>::max()),
101  m_Timeline(timeline), m_Iterator(timeline.end()),
102  m_GetJobImpl(get_job_impl)
103  {
104  _ASSERT(m_GetJobImpl.m_API->m_AffinityLadder.size());
105  }
106 
107  void Interrupt()
108  {
109  if (HasJob()) {
110  m_GetJobImpl.ReturnJob(job);
111  job.Reset();
112  }
113  }
114 
116  {
118  return m_Timeline.begin();
119  }
120 
121  TIterator Next(bool increment)
122  {
123  if (increment) {
124  if (m_Iterator == m_Timeline.end()) {
125  m_Iterator = m_Timeline.begin();
126  } else {
127  ++m_Iterator;
128  }
129 
130  // We've already got a job from an entry at m_Iterator + 1
131  // (that is why increment is true), so must not happen
132  _ASSERT(m_Iterator != m_Timeline.end());
133 
134  } else if (m_Iterator == m_Timeline.end()) {
135  return m_Timeline.begin();
136  }
137 
138  TIterator ret = m_Iterator;
139  return ++ret;
140  }
141 
142  const string& Affinity() const
143  {
144  // Must not happen, since otherwise Done() has returned true already
146 
148  affinity_ladder(m_GetJobImpl.m_API->m_AffinityLadder);
149 
150  if (HasJob()) {
151  // Only affinities that are higher that current job's one
152  return affinity_ladder[m_JobPriority - 1].second;
153  } else {
154  // All affinities
155  return affinity_ladder.back().second;
156  }
157  }
158 
159  bool Done()
160  {
161  // Must not happen, since otherwise Done() has returned true already
163 
164  // Return a less-priority job back
165  if (HasJob()) {
166  m_GetJobImpl.ReturnJob(m_PreviousJob);
167  }
168 
169  m_PreviousJob = job;
170 
172  affinity_ladder(m_GetJobImpl.m_API->m_AffinityLadder);
173 
174  size_t priority = min(affinity_ladder.size(), m_JobPriority) - 1;
175 
176  do {
177  if (job.affinity == affinity_ladder[priority].first) {
178  m_JobPriority = priority;
179 
180  // Return true, if job has the highest priority (zero)
181  return !m_JobPriority;
182  }
183  } while (priority-- > 0);
184 
185  // Whether affinities not from the ladder are allowed
186  if (m_GetJobImpl.m_API->m_AffinityPreference ==
188  // Make it the least-priority
189  m_JobPriority = affinity_ladder.size();
190  } else {
191  // Should not happen
192  ERR_POST("Got a job " << job.job_id <<
193  " with unexpected affinity " << job.affinity);
195  }
196 
197  return false;
198  }
199 
200  bool HasJob() const
201  {
203  }
204 
205 private:
209  TImpl& m_GetJobImpl;
211 };
212 
213 template <class TImpl>
214 template <class TJobHolder>
216 {
217  TIterator i = holder.Begin();
218 
219  for (;;) {
220  EState state = m_Impl.CheckState();
221 
222  if (state == eStopped) {
223  holder.Interrupt();
224  return eInterrupt;
225  }
226 
227  if (state == eRestarted) {
228  Restart();
229  i = holder.Begin();
230  continue;
231  }
232 
233  // We must check i here to let state be checked before leaving loop
234  if (i == m_ImmediateActions.end()) {
235  return holder.HasJob() ? eJob : eAgain;
236  }
237 
238  if (*i == m_DiscoveryAction) {
239  NextDiscoveryIteration();
240  i = holder.Begin();
241  continue;
242  }
243 
244  // Whether to move to the next entry
245  // (false means we are already at the next entry due to splice/erase)
246  bool increment = false;
247 
248  try {
249  // Get prioritized affinity list and
250  // a flag whether any affinity job is appropriate
251  const string& prio_aff_list = holder.Affinity();
252  const bool any_affinity = !holder.HasJob();
253 
254  if (m_Impl.CheckEntry(*i, prio_aff_list, any_affinity,
255  holder.job, holder.job_status)) {
256  if (i == m_ImmediateActions.begin()) {
257  increment = true;
258  } else {
259  // We have got a more prioritized job from this server.
260  // Move this server to the top of immediate actions,
261  // so we will have servers ordered (most-to-least)
262  // by affinities of the jobs they have returned last
263  m_ImmediateActions.splice(m_ImmediateActions.begin(),
264  m_ImmediateActions, i);
265  }
266 
267  // A job has been returned; keep the server in
268  // immediate actions because there can be more
269  // jobs in the queue.
270  if (holder.Done()) {
271  return eJob;
272  }
273  } else {
274  // No job has been returned by this server;
275  // query the server later.
276  i->deadline = CDeadline(m_Impl.m_Timeout, 0);
277  i->all_affinities_checked = any_affinity;
278  m_ScheduledActions.splice(m_ScheduledActions.end(),
279  m_ImmediateActions, i);
280  }
281  }
282  catch (CNetSrvConnException& e) {
283  // Because a connection error has occurred, do not
284  // put this server back to the timeline.
285  m_ImmediateActions.erase(i);
286  ERR_POST(Warning << e.GetMsg());
287  }
288  catch (...) {
289  m_ImmediateActions.erase(i);
290 
291  if (holder.HasJob()) {
292  return eJob;
293  }
294 
295  throw;
296  }
297 
298  // Check all servers that have timeout expired
299  while (!m_ScheduledActions.empty() &&
300  m_ScheduledActions.front().deadline.GetRemainingTime().IsZero()) {
301  m_ImmediateActions.splice(m_ImmediateActions.end(),
302  m_ScheduledActions, m_ScheduledActions.begin());
303  }
304 
305  // Check if there's a notification in the UDP socket.
306  while (CNetServer server = m_Impl.ReadNotifications()) {
307  MoveToImmediateActions(server);
308  }
309 
310  i = holder.Next(increment);
311  }
312 }
313 
314 template <class TImpl>
315 template <class TJobHolder>
317  const CDeadline& deadline, TJobHolder& holder)
318 {
319  for (;;) {
320  EResult ret = GetJobImmediately(holder);
321 
322  if (ret != eAgain) {
323  return ret;
324  }
325 
326  auto entry_has_more_jobs = [&](const SEntry& entry) {
327  return m_Impl.MoreJobs(entry);
328  };
329 
330  // If MoreJobs() returned false for all entries of m_ScheduledActions
331  if (find_if(m_ScheduledActions.begin(), m_ScheduledActions.end(),
332  entry_has_more_jobs) == m_ScheduledActions.end()) {
333  return eNoJobs;
334  }
335 
336  if (deadline.IsExpired())
337  return eAgain;
338 
339  // At least, the discovery action must be there
340  _ASSERT(!m_ScheduledActions.empty());
341 
342  // There's still time. Wait for notifications and query the servers.
343  CDeadline next_event_time = m_ScheduledActions.front().deadline;
344  bool last_wait = deadline < next_event_time;
345  if (last_wait) next_event_time = deadline;
346 
347  if (CNetServer server = m_Impl.WaitForNotifications(next_event_time)) {
348  do {
349  MoveToImmediateActions(server);
350  } while ((server = m_Impl.ReadNotifications()));
351  } else if (last_wait) {
352  return eAgain;
353  } else {
354  m_ImmediateActions.splice(m_ImmediateActions.end(),
355  m_ScheduledActions, m_ScheduledActions.begin());
356  }
357  }
358 }
359 
360 inline void Filter(TTimeline& timeline, TServers& servers)
361 {
362  TTimeline::iterator i = timeline.begin();
363 
364  while (i != timeline.end()) {
365  const SSocketAddress& address(i->server_address);
366  TServers::iterator j = find(servers.begin(), servers.end(), address);
367 
368  // If this server is still valid
369  if (j != servers.end()) {
370  servers.erase(j);
371  ++i;
372  } else {
373  timeline.erase(i++);
374  }
375  }
376 }
377 
378 template <class TImpl>
380 {
381  // Rediscover all servers
382  m_ImmediateActions.clear();
383  m_ScheduledActions.clear();
384  NextDiscoveryIteration();
385 }
386 
387 template <class TImpl>
389 {
390  SEntry entry(server_impl->m_ServerInPool->m_Address);
391 
392  TTimeline::iterator i = find(m_ScheduledActions.begin(),
393  m_ScheduledActions.end(), entry);
394 
395  // Server was postponed, move to immediate
396  if (i != m_ScheduledActions.end()) {
397  m_ImmediateActions.splice(m_ImmediateActions.end(),
398  m_ScheduledActions, i);
399  return;
400  }
401 
402  TTimeline::iterator j = find(m_ImmediateActions.begin(),
403  m_ImmediateActions.end(), entry);
404 
405  // It's new server, add to immediate
406  if (j == m_ImmediateActions.end()) {
407  m_ImmediateActions.push_back(entry);
408  }
409 }
410 
411 template <class TImpl>
413 {
414  TServers servers;
415 
416  for (CNetServiceIterator it =
417  m_Impl.m_API.GetService().Iterate(
418  CNetService::eIncludePenalized); it; ++it) {
419  servers.push_back((*it)->m_ServerInPool->m_Address);
420  }
421 
422  // Keep up to date servers
423  Filter(m_ImmediateActions, servers);
424  Filter(m_ScheduledActions, servers);
425 
426  // Add newly discovered servers
427  for (TServers::const_iterator i = servers.begin();
428  i != servers.end(); ++i) {
429  m_ImmediateActions.push_back(*i);
430  }
431 
432  // Reschedule discovery after timeout
433  m_DiscoveryAction.deadline = CDeadline(m_Impl.m_Timeout, 0);
434  m_ScheduledActions.push_back(m_DiscoveryAction);
435 }
436 
437 template <class TImpl>
439 {
440  // Return back to immediate actions
441  // all servers that have not been checked for all possible affinities
442  TIterator i = m_ScheduledActions.begin();
443 
444  while (i != m_ScheduledActions.end()) {
445  if (i->all_affinities_checked) {
446  ++i;
447  } else {
448  m_ImmediateActions.splice(m_ImmediateActions.end(),
449  m_ScheduledActions, i++);
450  }
451  }
452 }
453 
454 
457 
458 
CNetScheduleJob & job
CNetScheduleAPI::EJobStatus * job_status
CAnyAffinityJob(CNetScheduleJob &j, CNetScheduleAPI::EJobStatus *js, TTimeline &timeline)
const string & Affinity() const
CDeadline.
Definition: ncbitime.hpp:1830
CMostAffinityJob(CNetScheduleJob &j, CNetScheduleAPI::EJobStatus *js, TTimeline &timeline, TImpl &get_job_impl)
TIterator Next(bool increment)
CNetScheduleJob m_PreviousJob
CNetScheduleAPI::EJobStatus * job_status
const string & Affinity() const
EResult GetJobImpl(const CDeadline &deadline, TJobHolder &holder)
EResult GetJobImmediately(TJobHolder &holder)
void MoveToImmediateActions(SNetServerImpl *server_impl)
CNetScheduleGetJob::EResult GetJob(const CDeadline &deadline, CNetScheduleJob &job, CNetScheduleAPI::EJobStatus *job_status, bool any_affinity)
Net Service exception.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
const string & GetMsg(void) const
Get message string.
Definition: ncbiexpt.cpp:461
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
EJobStatus
Job status codes.
string job_id
Output job key.
#define numeric_limits
Pre-declaration of the "numeric_limits<>" template Forcibly overrides (using preprocessor) the origin...
Definition: ncbi_limits.hpp:92
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
#define kEmptyStr
Definition: ncbistr.hpp:123
bool IsExpired(void) const
Check if the deadline is expired.
Definition: ncbitime.hpp:1855
int i
vector< CSeq_align const * >::const_iterator TIterator
list< SSocketAddress > TServers
TTimeline::iterator TIterator
list< CNetScheduleGetJob::SEntry > TTimeline
void Filter(TTimeline &timeline, TServers &servers)
T max(T x_, T y_)
T min(T x_, T y_)
vector< pair< string, string > > TAffinityLadder
Job description.
CRef< SNetServerInPool > m_ServerInPool
#define _ASSERT
Modified on Wed Jun 12 11:16:13 2024 by modify_doxy.py rev. 669887