NCBI C++ ToolKit
job.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: job.cpp 93717 2021-05-14 17:29:38Z satskyse $
2 * ===========================================================================
3 *
4 * PUBLIC DOMAIN NOTICE
5 * National Center for Biotechnology Information
6 *
7 * This software/database is a "United States Government Work" under the
8 * terms of the United States Copyright Act. It was written as part of
9 * the author's official duties as a United States Government employee and
10 * thus cannot be copyrighted. This software/database is freely available
11 * to the public for use. The National Library of Medicine and the U.S.
12 * Government have not placed any restriction on its use or reproduction.
13 *
14 * Although all reasonable efforts have been taken to ensure the accuracy
15 * and reliability of the software and data, the NLM and the U.S.
16 * Government do not and cannot warrant the performance or results that
17 * may be obtained by using this software or data. The NLM and the U.S.
18 * Government disclaim all warranties, express or implied, including
19 * warranties of performance, merchantability or fitness for any particular
20 * purpose.
21 *
22 * Please cite the author in any work or product based on this material.
23 *
24 * ===========================================================================
25 *
26 * Authors: Victor Joukov
27 *
28 * File Description:
29 * NetSchedule job
30 *
31 */
32 
33 #include <ncbi_pch.hpp>
34 
35 #include <string.h>
36 
38 
39 #include "job.hpp"
40 #include "ns_queue.hpp"
41 #include "ns_handler.hpp"
42 #include "ns_command_arguments.hpp"
43 #include "ns_affinity.hpp"
44 #include "ns_group.hpp"
45 #include "ns_db_dump.hpp"
46 
47 
49 
50 
51 //////////////////////////////////////////////////////////////////////////
52 // CJobEvent implementation
53 
54 static string s_EventAsString[] = {
55  "Submit", // eSubmit
56  "BatchSubmit", // eBatchSubmit
57  "Request", // eRequest
58  "Done", // eDone
59  "Return", // eReturn
60  "Fail", // eFail
61  "FinalFail", // eFinalFail
62  "Read", // eRead
63  "ReadFail", // eReadFail
64  "ReadFinalFail", // eReadFinalFail
65  "ReadDone", // eReadDone
66  "ReadRollback", // eReadRollback
67  "Clear", // eClear
68  "Cancel", // eCancel
69  "Timeout", // eTimeout
70  "ReadTimeout", // eReadTimeout
71  "SessionChanged", // eSessionChanged
72  "NSSubmitRollback", // eNSSubmitRollback
73  "NSGetRollback", // eNSGetRollback
74  "NSReadRollback", // eNSReadRollback
75  "ReturnNoBlacklist",// eReturnNoBlacklist
76  "Reschedule", // eReschedule
77  "Redo", // eRedo
78  "Reread" // eReread
79 };
80 
81 
83 {
84  if (event < eSubmit || event > eReread)
85  return "UNKNOWN";
86 
87  return s_EventAsString[ event ];
88 }
89 
90 
92  m_Status(CNetScheduleAPI::eJobNotFound),
93  m_Event(eUnknown),
94  m_Timestamp(0, 0),
95  m_NodeAddr(0),
96  m_RetCode(0)
97 {}
98 
99 
102  TJobStatus status,
103  const CNSPreciseTime & job_submit_time,
104  const CNSPreciseTime & job_timeout,
105  const CNSPreciseTime & job_run_timeout,
106  const CNSPreciseTime & job_read_timeout,
107  const CNSPreciseTime & queue_timeout,
108  const CNSPreciseTime & queue_run_timeout,
109  const CNSPreciseTime & queue_read_timeout,
110  const CNSPreciseTime & queue_pending_timeout,
111  const CNSPreciseTime & event_time)
112 {
113  CNSPreciseTime last_update = event_time;
114  if (last_update == kTimeZero)
115  last_update = last_touch;
116 
117  if (status == CNetScheduleAPI::eRunning) {
118  if (job_run_timeout != kTimeZero)
119  return last_update + job_run_timeout;
120  return last_update + queue_run_timeout;
121  }
122 
123  if (status == CNetScheduleAPI::eReading) {
124  if (job_read_timeout != kTimeZero)
125  return last_update + job_read_timeout;
126  return last_update + queue_read_timeout;
127  }
128 
129  if (status == CNetScheduleAPI::ePending) {
130  CNSPreciseTime regular_expiration = last_update +
131  queue_timeout;
132  if (job_timeout != kTimeZero)
133  regular_expiration = last_update + job_timeout;
134  CNSPreciseTime pending_expiration = job_submit_time +
135  queue_pending_timeout;
136 
137  if (regular_expiration < pending_expiration)
138  return regular_expiration;
139  return pending_expiration;
140  }
141 
142  if (job_timeout != kTimeZero)
143  return last_update + job_timeout;
144  return last_update + queue_timeout;
145 }
146 
147 
148 
149 
150 //////////////////////////////////////////////////////////////////////////
151 // CJob implementation
152 
154  m_Id(0),
155  m_Passport(0),
156  m_Status(CNetScheduleAPI::ePending),
157  m_Timeout(),
158  m_RunTimeout(),
159  m_ReadTimeout(),
160  m_SubmNotifPort(0),
161  m_SubmNotifTimeout(),
162  m_ListenerNotifAddress(0),
163  m_ListenerNotifPort(0),
164  m_ListenerNotifAbsTime(),
165  m_RunCount(0),
166  m_ReadCount(0),
167  m_AffinityId(0),
168  m_Mask(0),
169  m_GroupId(0),
170  m_LastTouch(),
171  m_NeedSubmProgressMsgNotif(false),
172  m_NeedLsnrProgressMsgNotif(false),
173  m_NeedStolenNotif(false)
174 {}
175 
176 
177 CJob::CJob(const SNSCommandArguments & request) :
178  m_Id(0),
179  m_Passport(0),
180  m_Status(CNetScheduleAPI::ePending),
181  m_Timeout(),
182  m_RunTimeout(),
183  m_ReadTimeout(),
184  m_SubmNotifPort(request.port),
185  m_SubmNotifTimeout(request.timeout, 0),
186  m_ListenerNotifAddress(0),
187  m_ListenerNotifPort(0),
188  m_ListenerNotifAbsTime(),
189  m_RunCount(0),
190  m_ReadCount(0),
191  m_ProgressMsg(""),
192  m_AffinityId(0),
193  m_Mask(request.job_mask),
194  m_GroupId(0),
195  m_LastTouch(),
196  m_Output(""),
197  m_NeedSubmProgressMsgNotif(request.need_progress_msg),
198  m_NeedLsnrProgressMsgNotif(false),
199  m_NeedStolenNotif(false)
200 {
201  SetClientIP(request.ip);
202  SetClientSID(request.sid);
203  SetNCBIPHID(request.ncbi_phid);
204  SetInput(request.input);
205 }
206 
207 
208 string CJob::GetErrorMsg() const
209 {
210  if (m_Events.empty())
211  return kEmptyStr;
212  return GetLastEvent()->GetErrorMsg();
213 }
214 
215 
216 int CJob::GetRetCode() const
217 {
218  if (m_Events.empty())
219  return ~0;
220  return GetLastEvent()->GetRetCode();
221 }
222 
223 
224 void CJob::SetInput(const string & input)
225 {
226  m_Input = input;
227 }
228 
229 
230 void CJob::SetOutput(const string & output)
231 {
232  m_Output = output;
233 }
234 
235 
237 {
238  m_Events.push_back(CJobEvent());
239  return m_Events[m_Events.size() - 1];
240 }
241 
242 
244 {
245  // The first event is attached when a job is submitted, so
246  // there is no way to have the events empty
247  return &(m_Events[m_Events.size() - 1]);
248 }
249 
250 
252 {
253  // The first event is attached when a job is submitted, so
254  // there is no way to have the events empty
255  return &(m_Events[m_Events.size() - 1]);
256 }
257 
258 
260 CJob::CompareAuthToken(const string & auth_token) const
261 {
262  vector<string> parts;
263 
264  NStr::Split(auth_token, "_", parts);
265  if (parts.size() < 2)
266  return eInvalidTokenFormat;
267 
268  try {
269  if (NStr::StringToUInt(parts[0]) != m_Passport)
270  return eNoMatch;
271  if (NStr::StringToSizet(parts[1]) != m_Events.size())
272  return ePassportOnlyMatch;
273  } catch (...) {
274  // Cannot convert the value
275  return eInvalidTokenFormat;
276  }
277  return eCompleteMatch;
278 }
279 
280 
281 bool CJob::ShouldNotifySubmitter(const CNSPreciseTime & current_time) const
282 {
283  // The very first event is always a submit
285  return m_Events[0].m_Timestamp +
286  CNSPreciseTime(m_SubmNotifTimeout, 0) >= current_time;
287  return false;
288 }
289 
290 
291 bool CJob::ShouldNotifyListener(const CNSPreciseTime & current_time) const
292 {
294  return m_ListenerNotifAbsTime >= current_time;
295  return false;
296 }
297 
298 
299 // Used to DUMP a job
300 string CJob::Print(TDumpFields dump_fields,
301  const CQueue & queue,
302  const CNSAffinityRegistry & aff_registry,
303  const CNSGroupsRegistry & group_registry) const
304 {
305  CNSPreciseTime timeout = m_Timeout;
306  CNSPreciseTime run_timeout = m_RunTimeout;
307  CNSPreciseTime read_timeout = m_ReadTimeout;
308  CNSPreciseTime pending_timeout = queue.GetPendingTimeout();
309  CNSPreciseTime queue_timeout = queue.GetTimeout();
310  CNSPreciseTime queue_run_timeout = queue.GetRunTimeout();
311  CNSPreciseTime queue_read_timeout = queue.GetReadTimeout();
312  string result;
313 
314  result.reserve(2048); // Voluntary; must be enough for most of the cases
315 
316  if (m_Timeout == kTimeZero)
317  timeout = queue_timeout;
318  if (m_RunTimeout == kTimeZero)
319  run_timeout = queue_run_timeout;
320  if (m_ReadTimeout == kTimeZero)
321  read_timeout = queue_read_timeout;
322 
323  CNSPreciseTime exp_time;
326  exp_time = GetExpirationTime(queue_timeout,
327  queue_run_timeout,
328  queue_read_timeout,
329  pending_timeout,
330  GetLastEventTime());
331  else
332  exp_time = GetExpirationTime(queue_timeout,
333  queue_run_timeout,
334  queue_read_timeout,
335  pending_timeout,
336  m_LastTouch);
337 
338  if (dump_fields & eId)
340  if (dump_fields & eKey)
341  x_AppendKey(queue, result);
342  if (dump_fields & eStatus)
344  if (dump_fields & eLastTouch)
346  if (dump_fields & eEraseTime)
347  x_AppendEraseTime(timeout, pending_timeout, exp_time, result);
348  if (dump_fields & eRunExpiration)
349  x_AppendRunExpiration(run_timeout, exp_time, result);
350  if (dump_fields & eReadExpiration)
351  x_AppendReadExpiration(read_timeout, exp_time, result);
352  if (dump_fields & eSubmitNotifPort)
354  if (dump_fields & eSubmitNotifExpiration)
356  if (dump_fields & eListenerNotif)
358  if (dump_fields & eListenerNotifExpiration)
360  if (dump_fields & eEvents)
362  if (dump_fields & eRunCounter)
364  if (dump_fields & eReadCounter)
366  if (dump_fields & eAffinity)
367  x_AppendAffinity(aff_registry, result);
368  if (dump_fields & eGroup)
369  x_AppendGroup(group_registry, result);
370  if (dump_fields & eMask)
372  if (dump_fields & eInput)
374  if (dump_fields & eOutput)
376  if (dump_fields & eProgressMsg)
378  if (dump_fields & eRemoteClientSID)
380  if (dump_fields & eRemoteClientIP)
382  if (dump_fields & eNcbiPHID)
384  if (dump_fields & eNeedSubmitProgressMsgNotif)
386  if (dump_fields & eNeedListenerProgressMsgNotif)
388  if (dump_fields & eNeedStolenNotif)
390 
391  return result;
392 }
393 
394 void CJob::x_AppendId(string & dump) const
395 {
396  static string prefix = "OK:id: ";
397  dump.append(prefix)
398  .append(to_string(m_Id))
399  .append(kNewLine);
400 }
401 
402 void CJob::x_AppendKey(const CQueue & queue, string & dump) const
403 {
404  static string prefix = "OK:key: ";
405  dump.append(prefix)
406  .append(queue.MakeJobKey(m_Id))
407  .append(kNewLine);
408 }
409 
410 void CJob::x_AppendStatus(string & dump) const
411 {
412  static string prefix = "OK:status: ";
413  dump.append(prefix)
415  .append(kNewLine);
416 }
417 
418 void CJob::x_AppendLastTouch(string & dump) const
419 {
420  static string prefix = "OK:last_touch: ";
421  dump.append(prefix)
423  .append(kNewLine);
424 }
425 
427  const CNSPreciseTime & pending_timeout,
428  const CNSPreciseTime & exp_time,
429  string & dump) const
430 {
431  static string prefix = "OK:erase_time: ";
432  static string na_prefix = prefix + "n/a (timeout: ";
433  static string timeout_suffix = " (timeout: ";
434  static string postfix = " sec)" + kNewLine;
435  static string pending_suffix = " sec, pending timeout: ";
437  dump.append(na_prefix)
438  .append(NS_FormatPreciseTimeAsSec(timeout))
439  .append(pending_suffix)
440  .append(NS_FormatPreciseTimeAsSec(pending_timeout))
441  .append(postfix);
442  else
443  dump.append(prefix)
444  .append(NS_FormatPreciseTime(exp_time))
445  .append(timeout_suffix)
446  .append(NS_FormatPreciseTimeAsSec(timeout))
447  .append(pending_suffix)
448  .append(NS_FormatPreciseTimeAsSec(pending_timeout))
449  .append(postfix);
450 }
451 
453  const CNSPreciseTime & exp_time,
454  string & dump) const
455 {
456  static string prefix = "OK:run_expiration: ";
457  static string na_prefix = prefix + "n/a (timeout: ";
458  static string suffix = " (timeout: ";
459  static string postfix = " sec)" + kNewLine;
461  dump.append(prefix)
462  .append(NS_FormatPreciseTime(exp_time))
463  .append(suffix)
464  .append(NS_FormatPreciseTimeAsSec(run_timeout))
465  .append(postfix);
466  else
467  dump.append(na_prefix)
468  .append(NS_FormatPreciseTimeAsSec(run_timeout))
469  .append(postfix);
470 }
471 
473  const CNSPreciseTime & exp_time,
474  string & dump) const
475 {
476  static string prefix = "OK:read_expiration: ";
477  static string na_prefix = prefix + "n/a (timeout: ";
478  static string suffix = " (timeout: ";
479  static string postfix = " sec)" + kNewLine;
481  dump.append(prefix)
482  .append(NS_FormatPreciseTime(exp_time))
483  .append(suffix)
484  .append(NS_FormatPreciseTimeAsSec(read_timeout))
485  .append(postfix);
486  else
487  dump.append(na_prefix)
488  .append(NS_FormatPreciseTimeAsSec(read_timeout))
489  .append(postfix);
490 }
491 
493 {
494  static string prefix = "OK:subm_notif_port: ";
495  static string na_reply = prefix + "n/a" + kNewLine;
496  if (m_SubmNotifPort != 0)
497  dump.append(prefix)
498  .append(to_string(m_SubmNotifPort))
499  .append(kNewLine);
500  else
501  dump.append(na_reply);
502 }
503 
505 {
506  static string prefix = "OK:subm_notif_expiration: ";
507  static string suffix = " (timeout: ";
508  static string postfix = " sec)" + kNewLine;
509  static string na_reply = prefix + "n/a" + kNewLine;
511  dump.append(prefix)
512  .append(NS_FormatPreciseTime(m_Events[0].m_Timestamp + m_SubmNotifTimeout))
513  .append(suffix)
515  .append(postfix);
516  else
517  dump.append(na_reply);
518 }
519 
520 void CJob::x_AppendListenerNotif(string & dump) const
521 {
522  static string prefix = "OK:listener_notif: ";
523  static string na_reply = prefix + "n/a" + kNewLine;
524  static string colon = ":";
526  dump.append(na_reply);
527  else
528  dump.append(prefix)
530  .append(colon)
531  .append(to_string(m_ListenerNotifPort))
532  .append(kNewLine);
533 }
534 
536 {
537  static string prefix = "OK:listener_notif_expiration: ";
538  static string na_reply = prefix + "n/a" + kNewLine;
540  dump.append(prefix)
542  .append(kNewLine);
543  else
544  dump.append(na_reply);
545 }
546 
547 void CJob::x_AppendEvents(string & dump) const
548 {
549  static string prefix = "OK:event";
550  static string client = ": client=";
551  static string ns = "ns";
552  static string event = " event=";
553  static string status = " status=";
554  static string ret_code = " ret_code=";
555  static string timestamp = " timestamp=";
556  static string na = "n/a ";
557  static string node = "' node='";
558  static string session = "' session='";
559  static string err_msg = "' err_msg=";
560  int event_no = 1;
561 
562  for (const auto & ev : m_Events) {
563  unsigned int addr = ev.GetNodeAddr();
564 
565  dump.append(prefix)
566  .append(to_string(event_no))
567  .append(client);
568  if (addr == 0)
569  dump.append(ns);
570  else
571  dump.append(CSocketAPI::gethostbyaddr(addr));
572 
573  dump.append(event)
574  .append(CJobEvent::EventToString(ev.GetEvent()))
575  .append(status)
576  .append(CNetScheduleAPI::StatusToString(ev.GetStatus()))
577  .append(ret_code)
578  .append(to_string(ev.GetRetCode()))
579  .append(timestamp);
580 
581  CNSPreciseTime start = ev.GetTimestamp();
582  if (start == kTimeZero)
583  dump.append(na);
584  else
585  dump.append(1, '\'')
586  .append(NS_FormatPreciseTime(start))
587  .append(node)
588  .append(ev.GetClientNode())
589  .append(session)
590  .append(ev.GetClientSession())
591  .append(err_msg)
592  .append(ev.GetQuotedErrorMsg())
593  .append(kNewLine);
594  ++event_no;
595  }
596 }
597 
598 void CJob::x_AppendRunCounter(string & dump) const
599 {
600  static string prefix = "OK:run_counter: ";
601  dump.append(prefix)
602  .append(to_string(m_RunCount))
603  .append(kNewLine);
604 }
605 
606 void CJob::x_AppendReadCounter(string & dump) const
607 {
608  static string prefix = "OK:read_counter: ";
609  dump.append(prefix)
610  .append(to_string(m_ReadCount))
611  .append(kNewLine);
612 }
613 
614 void CJob::x_AppendAffinity(const CNSAffinityRegistry & aff_registry,
615  string & dump) const
616 {
617  static string prefix = "OK:affinity: ";
618  static string na_reply = prefix + "n/a" + kNewLine;
619  static string open_paren = " ('";
620  static string close_paren = "')";
621  if (m_AffinityId != 0)
622  dump.append(prefix)
623  .append(to_string(m_AffinityId))
624  .append(open_paren)
625  .append(NStr::PrintableString(aff_registry.GetTokenByID(m_AffinityId)))
626  .append(close_paren)
627  .append(kNewLine);
628  else
629  dump.append(na_reply);
630 }
631 
632 void CJob::x_AppendGroup(const CNSGroupsRegistry & group_registry,
633  string & dump) const
634 {
635  static string prefix = "OK:group: ";
636  static string na_reply = prefix + "n/a" + kNewLine;
637  static string open_paren = " ('";
638  static string close_paren = "')";
639  if (m_GroupId != 0)
640  dump.append(prefix)
641  .append(to_string(m_GroupId))
642  .append(open_paren)
643  .append(NStr::PrintableString(group_registry.ResolveGroup(m_GroupId)))
644  .append(close_paren)
645  .append(kNewLine);
646  else
647  dump.append(na_reply);
648 }
649 
650 void CJob::x_AppendMask(string & dump) const
651 {
652  static string prefix = "OK:mask: ";
653  dump.append(prefix)
654  .append(to_string(m_Mask))
655  .append(kNewLine);
656 }
657 
658 void CJob::x_AppendInput(string & dump) const
659 {
660  static string prefix = "OK:input: '";
661  dump.append(prefix)
663  .append(1, '\'')
664  .append(kNewLine);
665 }
666 
667 void CJob::x_AppendOutput(string & dump) const
668 {
669  static string prefix = "OK:output: '";
670  dump.append(prefix)
672  .append(1, '\'')
673  .append(kNewLine);
674 }
675 
676 void CJob::x_AppendProgressMsg(string & dump) const
677 {
678  static string prefix = "OK:progress_msg: '";
679  dump.append(prefix)
680  .append(m_ProgressMsg)
681  .append(1, '\'')
682  .append(kNewLine);
683 }
684 
686 {
687  static string prefix = "OK:remote_client_sid: ";
688  dump.append(prefix)
690  .append(kNewLine);
691 }
692 
693 void CJob::x_AppendRemoteClientIP(string & dump) const
694 {
695  static string prefix = "OK:remote_client_ip: ";
696  dump.append(prefix)
698  .append(kNewLine);
699 }
700 
701 void CJob::x_AppendNcbiPhid(string & dump) const
702 {
703  static string prefix = "OK:ncbi_phid: ";
704  dump.append(prefix)
706  .append(kNewLine);
707 }
708 
710 {
711  static string prefix = "OK:need_subm_progress_msg_notif: ";
712  dump.append(prefix)
714  .append(kNewLine);
715 }
716 
718 {
719  static string prefix = "OK:need_lsnr_progress_msg_notif: ";
720  dump.append(prefix)
722  .append(kNewLine);
723 }
724 
726 {
727  static string prefix = "OK:need_stolen_notif: ";
728  dump.append(prefix)
730  .append(kNewLine);
731 }
732 
733 
735 {
736  ssize_t index = m_Events.size() - 1;
737  while (index >= 0) {
739  break;
740  --index;
741  }
742 
743  --index;
744  if (index < 0)
745  NCBI_THROW(CNetScheduleException, eInternalError,
746  "inconsistency in the job history. "
747  "No reading status found or no event before reading.");
748  return m_Events[index].GetStatus();
749 }
750 
751 
752 void CJob::Dump(FILE * jobs_file) const
753 {
754  // Fill in the job dump structure
755  SJobDump job_dump;
756 
757  job_dump.id = m_Id;
758  job_dump.passport = m_Passport;
759  job_dump.status = (int) m_Status;
760  job_dump.timeout = (double)m_Timeout;
761  job_dump.run_timeout = (double)m_RunTimeout;
762  job_dump.read_timeout = (double)m_ReadTimeout;
763  job_dump.subm_notif_port = m_SubmNotifPort;
764  job_dump.subm_notif_timeout = (double)m_SubmNotifTimeout;
771  job_dump.run_counter = m_RunCount;
772  job_dump.read_counter = m_ReadCount;
773  job_dump.aff_id = m_AffinityId;
774  job_dump.mask = m_Mask;
775  job_dump.group_id = m_GroupId;
776  job_dump.last_touch = (double)m_LastTouch;
777  job_dump.progress_msg_size = m_ProgressMsg.size();
778  job_dump.number_of_events = m_Events.size();
779 
780  job_dump.client_ip_size = m_ClientIP.size();
781  memcpy(job_dump.client_ip, m_ClientIP.data(),
782  min(static_cast<size_t>(kMaxClientIpSize),
783  m_ClientIP.size()));
784  job_dump.client_sid_size = m_ClientSID.size();
785  memcpy(job_dump.client_sid, m_ClientSID.data(),
786  min(static_cast<size_t>(kMaxSessionIdSize),
787  m_ClientSID.size()));
788  job_dump.ncbi_phid_size = m_NCBIPHID.size();
789  memcpy(job_dump.ncbi_phid, m_NCBIPHID.data(),
790  min(static_cast<size_t>(kMaxHitIdSize),
791  m_NCBIPHID.size()));
792 
793  try {
794  job_dump.Write(jobs_file, m_ProgressMsg.data());
795  } catch (const exception & ex) {
796  throw runtime_error("Writing error while dumping a job properties: " +
797  string(ex.what()));
798  }
799 
800  // Fill in the events structure
801  for (vector<CJobEvent>::const_iterator it = m_Events.begin();
802  it != m_Events.end(); ++it) {
803  const CJobEvent & event = *it;
804  SJobEventsDump events_dump;
805 
806  events_dump.event = int(event.m_Event);
807  events_dump.status = int(event.m_Status);
808  events_dump.timestamp = (double)event.m_Timestamp;
809  events_dump.node_addr = event.m_NodeAddr;
810  events_dump.ret_code = event.m_RetCode;
811  events_dump.client_node_size = event.m_ClientNode.size();
812  events_dump.client_session_size = event.m_ClientSession.size();
813  events_dump.err_msg_size = event.m_ErrorMsg.size();
814 
815  try {
816  events_dump.Write(jobs_file, event.m_ClientNode.data(),
817  event.m_ClientSession.data(),
818  event.m_ErrorMsg.data());
819  } catch (const exception & ex) {
820  throw runtime_error("Writing error while dumping a job events: " +
821  string(ex.what()));
822  }
823  }
824 
825  // Fill in the job input/output structure
826  SJobIODump job_io_dump;
827 
828  job_io_dump.input_size = m_Input.size();
829  job_io_dump.output_size = m_Output.size();
830 
831  try {
832  job_io_dump.Write(jobs_file, m_Input.data(), m_Output.data());
833  } catch (const exception & ex) {
834  throw runtime_error("Writing error while dumping a job "
835  "input/output: " + string(ex.what()));
836  }
837 }
838 
839 
840 // true => job loaded
841 // false => EOF
842 // exception => reading problem
843 bool CJob::LoadFromDump(FILE * jobs_file,
844  char * input_buf,
845  char * output_buf,
846  const SJobDumpHeader & header)
847 {
848  SJobDump job_dump;
849  char progress_msg_buf[kNetScheduleMaxDBDataSize];
850 
851  if (job_dump.Read(jobs_file, header.job_props_fixed_size,
852  progress_msg_buf) == 1)
853  return false;
854 
855  // Fill in the job fields
856  m_Id = job_dump.id;
857  m_Passport = job_dump.passport;
858  m_Status = (TJobStatus)job_dump.status;
859  m_Timeout = CNSPreciseTime(job_dump.timeout);
862  m_SubmNotifPort = job_dump.subm_notif_port;
870  m_RunCount = job_dump.run_counter;
871  m_ReadCount = job_dump.read_counter;
872  m_ProgressMsg.clear();
873  if (job_dump.progress_msg_size > 0)
874  m_ProgressMsg = string(progress_msg_buf, job_dump.progress_msg_size);
875  m_AffinityId = job_dump.aff_id;
876  m_Mask = job_dump.mask;
877  m_GroupId = job_dump.group_id;
879  m_ClientIP = string(job_dump.client_ip, min(job_dump.client_ip_size,
881  m_ClientSID = string(job_dump.client_sid, min(job_dump.client_sid_size,
883  m_NCBIPHID = string(job_dump.ncbi_phid, min(job_dump.ncbi_phid_size,
884  kMaxHitIdSize));
885 
886  // Read the job events
887  m_Events.clear();
888  SJobEventsDump event_dump;
889  char client_node_buf[kMaxWorkerNodeIdSize];
890  char client_session_buf[kMaxWorkerNodeIdSize];
891  char err_msg_buf[kNetScheduleMaxDBErrSize];
892  for (size_t k = 0; k < job_dump.number_of_events; ++k) {
893  if (event_dump.Read(jobs_file,
894  header.job_event_fixed_size,
895  client_node_buf, client_session_buf,
896  err_msg_buf) != 0)
897  throw runtime_error("Unexpected end of the dump file. "
898  "Cannot read expected job events." );
899 
900  // Fill the event and append it to the job events
901  CJobEvent event;
902  event.m_Status = (TJobStatus)event_dump.status;
903  event.m_Event = (CJobEvent::EJobEvent)event_dump.event;
904  event.m_Timestamp = CNSPreciseTime(event_dump.timestamp);
905  event.m_NodeAddr = event_dump.node_addr;
906  event.m_RetCode = event_dump.ret_code;
907  event.m_ClientNode.clear();
908  if (event_dump.client_node_size > 0)
909  event.m_ClientNode = string(client_node_buf,
910  event_dump.client_node_size);
911  event.m_ClientSession.clear();
912  if (event_dump.client_session_size > 0)
913  event.m_ClientSession = string(client_session_buf,
914  event_dump.client_session_size);
915  event.m_ErrorMsg.clear();
916  if (event_dump.err_msg_size > 0)
917  event.m_ErrorMsg = string(err_msg_buf,
918  event_dump.err_msg_size);
919 
920  m_Events.push_back(event);
921  }
922 
923 
924  // Read the job input/output
925  SJobIODump io_dump;
926 
927  if (io_dump.Read(jobs_file, header.job_io_fixed_size,
928  input_buf, output_buf) != 0)
929  throw runtime_error("Unexpected end of the dump file. "
930  "Cannot read expected job input/output." );
931 
932  SetInput(string(input_buf, io_dump.input_size));
933  SetOutput(string(output_buf, io_dump.output_size));
934  return true;
935 }
936 
937 
938 
940 
TJobStatus m_Status
Definition: job.hpp:154
EJobEvent
Definition: job.hpp:60
@ eReread
Definition: job.hpp:95
static std::string EventToString(EJobEvent event)
Definition: job.cpp:82
CJobEvent()
Definition: job.cpp:91
const string & GetErrorMsg() const
Definition: job.hpp:123
int GetRetCode() const
Definition: job.hpp:117
EJobEvent m_Event
Definition: job.hpp:155
TJobStatus GetStatusBeforeReading(void) const
Definition: job.cpp:734
bool m_NeedStolenNotif
Definition: job.hpp:443
void x_AppendRunCounter(string &dump) const
Definition: job.cpp:598
void x_AppendReadExpiration(const CNSPreciseTime &read_timeout, const CNSPreciseTime &exp_time, string &dump) const
Definition: job.cpp:472
void x_AppendEraseTime(const CNSPreciseTime &timeout, const CNSPreciseTime &pending_timeout, const CNSPreciseTime &exp_time, string &dump) const
Definition: job.cpp:426
void x_AppendOutput(string &dump) const
Definition: job.cpp:667
void x_AppendRemoteClientIP(string &dump) const
Definition: job.cpp:693
void SetNCBIPHID(const string &ncbi_phid)
Definition: job.hpp:316
CNSPreciseTime m_SubmNotifTimeout
Definition: job.hpp:418
CNSPreciseTime m_ReadTimeout
Definition: job.hpp:415
void SetClientIP(const string &client_ip)
Definition: job.hpp:312
bool ShouldNotifyListener(const CNSPreciseTime &current_time) const
Definition: job.cpp:291
void Dump(FILE *jobs_file) const
Definition: job.cpp:752
CNSPreciseTime GetLastEventTime(void) const
Definition: job.hpp:338
void x_AppendMask(string &dump) const
Definition: job.cpp:650
EAuthTokenCompareResult CompareAuthToken(const string &auth_token) const
Definition: job.cpp:260
void SetOutput(const string &output)
Definition: job.cpp:230
unsigned m_Id
Definition: job.hpp:410
bool m_NeedLsnrProgressMsgNotif
Definition: job.hpp:442
void x_AppendNcbiPhid(string &dump) const
Definition: job.cpp:701
int GetRetCode() const
Definition: job.cpp:216
TJobStatus m_Status
Definition: job.hpp:412
void x_AppendNeedSubmitProgressMsgNotif(string &dump) const
Definition: job.cpp:709
void x_AppendId(string &dump) const
Definition: job.cpp:394
void x_AppendListenerNotifExpiration(string &dump) const
Definition: job.cpp:535
CNSPreciseTime m_ListenerNotifAbsTime
Definition: job.hpp:422
unsigned m_GroupId
Definition: job.hpp:429
void x_AppendSubmitNotifExpiration(string &dump) const
Definition: job.cpp:504
void x_AppendEvents(string &dump) const
Definition: job.cpp:547
const CJobEvent * GetLastEvent() const
Definition: job.cpp:243
void x_AppendProgressMsg(string &dump) const
Definition: job.cpp:676
unsigned int m_Passport
Definition: job.hpp:411
string Print(TDumpFields dump_fields, const CQueue &queue, const CNSAffinityRegistry &aff_registry, const CNSGroupsRegistry &group_registry) const
Definition: job.cpp:300
unsigned short m_SubmNotifPort
Definition: job.hpp:417
string m_NCBIPHID
Definition: job.hpp:434
unsigned m_AffinityId
Definition: job.hpp:427
TJobStatus GetStatus() const
Definition: job.hpp:207
string m_Output
Definition: job.hpp:439
CNSPreciseTime m_Timeout
Definition: job.hpp:413
unsigned short m_ListenerNotifPort
Definition: job.hpp:421
CJobEvent & AppendEvent()
Definition: job.cpp:236
void x_AppendNeedListenerProgressMsgNotif(string &dump) const
Definition: job.cpp:717
string m_ProgressMsg
Definition: job.hpp:426
bool m_NeedSubmProgressMsgNotif
Definition: job.hpp:441
void x_AppendLastTouch(string &dump) const
Definition: job.cpp:418
void x_AppendRunExpiration(const CNSPreciseTime &run_timeout, const CNSPreciseTime &exp_time, string &dump) const
Definition: job.cpp:452
void x_AppendNeedStolenNotif(string &dump) const
Definition: job.cpp:725
void SetInput(const string &input)
Definition: job.cpp:224
vector< CJobEvent > m_Events
Definition: job.hpp:436
bool ShouldNotifySubmitter(const CNSPreciseTime &current_time) const
Definition: job.cpp:281
void x_AppendReadCounter(string &dump) const
Definition: job.cpp:606
string GetErrorMsg() const
Definition: job.cpp:208
CNSPreciseTime m_LastTouch
Definition: job.hpp:430
string m_Input
Definition: job.hpp:438
CJob()
Definition: job.cpp:153
void x_AppendRemoteClientSID(string &dump) const
Definition: job.cpp:685
string m_ClientIP
Definition: job.hpp:432
unsigned m_ReadCount
Definition: job.hpp:425
CNSPreciseTime GetExpirationTime(const CNSPreciseTime &queue_timeout, const CNSPreciseTime &queue_run_timeout, const CNSPreciseTime &queue_read_timeout, const CNSPreciseTime &queue_pending_timeout, const CNSPreciseTime &event_time) const
Definition: job.hpp:341
string m_ClientSID
Definition: job.hpp:433
void x_AppendGroup(const CNSGroupsRegistry &group_registry, string &dump) const
Definition: job.cpp:632
EAuthTokenCompareResult
Definition: job.hpp:185
@ eNoMatch
Definition: job.hpp:188
@ eInvalidTokenFormat
Definition: job.hpp:190
@ ePassportOnlyMatch
Definition: job.hpp:187
@ eCompleteMatch
Definition: job.hpp:186
void SetClientSID(const string &client_sid)
Definition: job.hpp:314
void x_AppendStatus(string &dump) const
Definition: job.cpp:410
unsigned m_RunCount
Definition: job.hpp:424
void x_AppendSubmitNotifPort(string &dump) const
Definition: job.cpp:492
unsigned int m_ListenerNotifAddress
Definition: job.hpp:420
void x_AppendKey(const CQueue &queue, string &dump) const
Definition: job.cpp:402
void x_AppendInput(string &dump) const
Definition: job.cpp:658
CNSPreciseTime m_RunTimeout
Definition: job.hpp:414
unsigned m_Mask
Definition: job.hpp:428
void x_AppendAffinity(const CNSAffinityRegistry &aff_registry, string &dump) const
Definition: job.cpp:614
void x_AppendListenerNotif(string &dump) const
Definition: job.cpp:520
bool LoadFromDump(FILE *jobs_file, char *input_buf, char *output_buf, const SJobDumpHeader &header)
Definition: job.cpp:843
string GetTokenByID(unsigned int aff_id) const
unsigned int ResolveGroup(const string &group)
Definition: ns_group.cpp:180
Client API for NCBI NetSchedule server.
NetSchedule internal exception.
string MakeJobKey(unsigned int job_id) const
Definition: ns_queue.cpp:4046
CNSPreciseTime GetTimeout() const
Definition: ns_queue.hpp:761
CNSPreciseTime GetReadTimeout() const
Definition: ns_queue.hpp:769
CNSPreciseTime GetPendingTimeout() const
Definition: ns_queue.hpp:773
CNSPreciseTime GetRunTimeout() const
Definition: ns_queue.hpp:765
#define false
Definition: bool.h:36
static SQLCHAR output[256]
Definition: print.c:5
@ eAffinity
Definition: grid_cli.hpp:194
@ eInput
Definition: grid_cli.hpp:163
string
Definition: cgiapp.hpp:687
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
@ eUnknown
Definition: app_popup.hpp:72
EJobStatus
Job status codes.
static string StatusToString(EJobStatus status)
Printable status type.
@ eReading
Job has its output been reading.
@ eRunning
Running on a worker node.
@ ePending
Waiting for execution.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
static string gethostbyaddr(unsigned int host, ESwitch log=eOff)
Return empty string on error.
static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)
Get a printable version of the specified string.
Definition: ncbistr.cpp:3944
#define kEmptyStr
Definition: ncbistr.hpp:123
static list< string > & Split(const CTempString str, const CTempString delim, list< string > &arr, TSplitFlags flags=0, vector< SIZE_TYPE > *token_pos=NULL)
Split a string using specified delimiters.
Definition: ncbistr.cpp:3452
static size_t StringToSizet(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to size_t.
Definition: ncbistr.cpp:1760
static const string BoolToString(bool value)
Convert bool to string.
Definition: ncbistr.cpp:2806
static unsigned int StringToUInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to unsigned int.
Definition: ncbistr.cpp:642
#define dump(b)
unsigned int
A callback function used to compare two keys in a database.
Definition: types.hpp:1210
static string s_EventAsString[]
Definition: job.cpp:54
CNSPreciseTime GetJobExpirationTime(const CNSPreciseTime &last_touch, TJobStatus status, const CNSPreciseTime &job_submit_time, const CNSPreciseTime &job_timeout, const CNSPreciseTime &job_run_timeout, const CNSPreciseTime &job_read_timeout, const CNSPreciseTime &queue_timeout, const CNSPreciseTime &queue_run_timeout, const CNSPreciseTime &queue_read_timeout, const CNSPreciseTime &queue_pending_timeout, const CNSPreciseTime &event_time)
Definition: job.cpp:101
static int input()
int ssize_t
Definition: ncbiconf_msvc.h:93
const unsigned int kNetScheduleMaxDBDataSize
const unsigned int kNetScheduleMaxDBErrSize
T min(T x_, T y_)
long TDumpFields
@ eSubmitNotifPort
@ eSubmitNotifExpiration
@ eListenerNotifExpiration
@ eRunExpiration
@ eProgressMsg
@ eRemoteClientIP
@ eNeedSubmitProgressMsgNotif
@ eRemoteClientSID
@ eNeedListenerProgressMsgNotif
@ eNeedStolenNotif
@ eListenerNotif
@ eReadCounter
@ eReadExpiration
NetSchedule job groups.
const CNSPreciseTime kTimeZero
string NS_FormatPreciseTimeAsSec(const CNSPreciseTime &t)
string NS_FormatPreciseTime(const CNSPreciseTime &t)
const unsigned kMaxHitIdSize
Definition: ns_types.hpp:109
const unsigned kMaxWorkerNodeIdSize
Definition: ns_types.hpp:117
const unsigned kMaxSessionIdSize
Definition: ns_types.hpp:111
static string kNewLine("\n")
const unsigned kMaxClientIpSize
Definition: ns_types.hpp:110
CNetScheduleAPI::EJobStatus TJobStatus
Definition: ns_types.hpp:71
static const char * suffix[]
Definition: pcregrep.c:408
static const char * prefix[]
Definition: pcregrep.c:405
static CNamedPipeClient * client
Uint4 job_event_fixed_size
Definition: ns_db_dump.hpp:83
Uint4 job_props_fixed_size
Definition: ns_db_dump.hpp:81
Uint4 job_io_fixed_size
Definition: ns_db_dump.hpp:82
void Write(FILE *f, const char *progress_msg)
Definition: ns_db_dump.cpp:153
Uint4 run_counter
Definition: ns_db_dump.hpp:137
bool need_lsnr_progress_msg_notif
Definition: ns_db_dump.hpp:134
Uint4 listener_notif_addr
Definition: ns_db_dump.hpp:130
Uint4 mask
Definition: ns_db_dump.hpp:141
double last_touch
Definition: ns_db_dump.hpp:145
double run_timeout
Definition: ns_db_dump.hpp:124
Uint4 subm_notif_port
Definition: ns_db_dump.hpp:127
Uint4 client_sid_size
Definition: ns_db_dump.hpp:150
double read_timeout
Definition: ns_db_dump.hpp:125
Uint4 passport
Definition: ns_db_dump.hpp:121
bool need_subm_progress_msg_notif
Definition: ns_db_dump.hpp:133
Uint4 progress_msg_size
Definition: ns_db_dump.hpp:146
Uint4 aff_id
Definition: ns_db_dump.hpp:140
double subm_notif_timeout
Definition: ns_db_dump.hpp:128
double timeout
Definition: ns_db_dump.hpp:123
char client_sid[kMaxSessionIdSize]
Definition: ns_db_dump.hpp:153
double listener_notif_abstime
Definition: ns_db_dump.hpp:132
Uint4 client_ip_size
Definition: ns_db_dump.hpp:149
int Read(FILE *f, size_t fixed_size_from_header, char *progress_msg)
Definition: ns_db_dump.cpp:168
Uint4 read_counter
Definition: ns_db_dump.hpp:138
Uint4 id
Definition: ns_db_dump.hpp:119
Uint4 number_of_events
Definition: ns_db_dump.hpp:147
char client_ip[kMaxClientIpSize]
Definition: ns_db_dump.hpp:152
Uint4 listener_notif_port
Definition: ns_db_dump.hpp:131
Uint4 ncbi_phid_size
Definition: ns_db_dump.hpp:151
Uint4 group_id
Definition: ns_db_dump.hpp:143
bool need_stolen_notif
Definition: ns_db_dump.hpp:135
char ncbi_phid[kMaxHitIdSize]
Definition: ns_db_dump.hpp:154
Int4 status
Definition: ns_db_dump.hpp:122
void Write(FILE *f, const char *client_node, const char *client_session, const char *err_msg)
Definition: ns_db_dump.cpp:376
Uint4 client_session_size
Definition: ns_db_dump.hpp:197
int Read(FILE *f, size_t fixed_size_from_header, char *client_node, char *client_session, char *err_msg)
Definition: ns_db_dump.cpp:404
Uint4 client_node_size
Definition: ns_db_dump.hpp:196
int Read(FILE *f, size_t fixed_size_from_header, char *input, char *output)
Definition: ns_db_dump.cpp:291
Uint4 output_size
Definition: ns_db_dump.hpp:173
Uint4 input_size
Definition: ns_db_dump.hpp:172
void Write(FILE *f, const char *input, const char *output)
Definition: ns_db_dump.cpp:268
else result
Definition: token2.c:20
Modified on Sun Jul 14 04:57:21 2024 by modify_doxy.py rev. 669887