NCBI C++ ToolKit
ns_handler.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: ns_handler.cpp 94991 2021-09-27 13:56:45Z grichenk $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Anatoliy Kuznetsov, Victor Joukov
27  *
28  * File Description: netschedule commands handler
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 
34 #include <corelib/ncbi_system.hpp>
36 
37 #include "ns_handler.hpp"
38 #include "ns_server.hpp"
39 #include "ns_server_misc.hpp"
40 #include "ns_rollback.hpp"
41 #include "queue_database.hpp"
42 #include "ns_application.hpp"
43 #include "ns_restore_state.hpp"
44 
45 #include <sys/types.h>
46 #include <sys/socket.h>
47 #include <netinet/in.h>
48 #include <netinet/tcp.h>
49 
50 
52 
53 
54 // A few popular protocol response constants
55 const string kEndOfResponse = "\n";
56 const string kOKCompleteResponse = "OK:" + kEndOfResponse;
57 const string kErrNoJobFoundResponse = "ERR:eJobNotFound:" + kEndOfResponse;
58 const string kOKResponsePrefix = "OK:";
59 
60 
61 
62 // NetSchedule command parser
63 //
64 
68  { { "size", eNSPT_Int, eNSPA_Required } } },
71  { NULL }
72 };
73 
75  { "ENDB" },
76  { NULL }
77 };
78 
80  { "input", eNSPT_Str, eNSPA_Required },
81  { "aff", eNSPT_Str, eNSPA_Optional, "" },
82  { "msk", eNSPT_Int, eNSPA_Optional, "0" },
83  { NULL }
84 };
85 
87 
89  eNS_Admin },
90  { { "drain", eNSPT_Int, eNSPA_Optional, "0" },
91  { "ip", eNSPT_Str, eNSPA_Optional, "" },
92  { "sid", eNSPT_Str, eNSPA_Optional, "" },
93  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
95  eNS_NoChecks },
96  { { "effective", eNSPT_Int, eNSPA_Optional, "0" },
97  { "ip", eNSPT_Str, eNSPA_Optional, "" },
98  { "sid", eNSPT_Str, eNSPA_Optional, "" },
99  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
100  { "REFUSESUBMITS", { &CNetScheduleHandler::x_ProcessRefuseSubmits,
101  eNS_Admin },
102  { { "mode", eNSPT_Int, eNSPA_Required },
103  { "ip", eNSPT_Str, eNSPA_Optional, "" },
104  { "sid", eNSPT_Str, eNSPA_Optional, "" },
105  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
107  eNS_Queue },
108  { { "pullback", eNSPT_Int, eNSPA_Optional, "0" },
109  { "ip", eNSPT_Str, eNSPA_Optional, "" },
110  { "sid", eNSPT_Str, eNSPA_Optional, "" },
111  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
112  { "QRESUME", { &CNetScheduleHandler::x_ProcessResume,
113  eNS_Queue },
114  { { "ip", eNSPT_Str, eNSPA_Optional, "" },
115  { "sid", eNSPT_Str, eNSPA_Optional, "" },
116  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
118  eNS_NoChecks },
119  { { "ip", eNSPT_Str, eNSPA_Optional, "" },
120  { "sid", eNSPT_Str, eNSPA_Optional, "" },
121  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
123  eNS_NoChecks },
124  { { "ip", eNSPT_Str, eNSPA_Optional, "" },
125  { "sid", eNSPT_Str, eNSPA_Optional, "" },
126  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
128  eNS_NoChecks },
129  { { "ip", eNSPT_Str, eNSPA_Optional, "" },
130  { "sid", eNSPT_Str, eNSPA_Optional, "" },
131  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
133  eNS_NoChecks },
134  { { "alert", eNSPT_Id, eNSPA_Required },
135  { "user", eNSPT_Str, eNSPA_Required },
136  { "ip", eNSPT_Str, eNSPA_Optional, "" },
137  { "sid", eNSPT_Str, eNSPA_Optional, "" },
138  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
140  eNS_NoChecks },
141  { { "ip", eNSPT_Str, eNSPA_Optional, "" },
142  { "sid", eNSPT_Str, eNSPA_Optional, "" },
143  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
145  eNS_NoChecks },
146  { { "ip", eNSPT_Str, eNSPA_Optional, "" },
147  { "sid", eNSPT_Str, eNSPA_Optional, "" },
148  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
150  eNS_NoChecks },
151  { { "ip", eNSPT_Str, eNSPA_Optional, "" },
152  { "sid", eNSPT_Str, eNSPA_Optional, "" },
153  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
155  eNS_NoChecks },
156  { { "qname", eNSPT_Id, eNSPA_Required },
157  { "ip", eNSPT_Str, eNSPA_Optional, "" },
158  { "sid", eNSPT_Str, eNSPA_Optional, "" },
159  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
161  eNS_NoChecks },
162  { { "qname", eNSPT_Id, eNSPA_Optional, "" },
163  { "service", eNSPT_Id, eNSPA_Optional, "" },
164  { "ip", eNSPT_Str, eNSPA_Optional, "" },
165  { "sid", eNSPT_Str, eNSPA_Optional, "" },
166  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
168  eNS_NoChecks },
169  { { "qname", eNSPT_Id, eNSPA_Optional },
170  { "ip", eNSPT_Str, eNSPA_Optional, "" },
171  { "sid", eNSPT_Str, eNSPA_Optional, "" },
172  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
174  eNS_Queue },
175  { { "scope", eNSPT_Id, eNSPA_Optional },
176  { "ip", eNSPT_Str, eNSPA_Optional, "" },
177  { "sid", eNSPT_Str, eNSPA_Optional, "" },
178  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
181  { { "ip", eNSPT_Str, eNSPA_Optional, "" },
182  { "sid", eNSPT_Str, eNSPA_Optional, "" },
183  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
184  /* QCRE checks 'program' and 'submit hosts' from the class */
186  eNS_NoChecks },
187  { { "qname", eNSPT_Id, eNSPA_Required },
188  { "qclass", eNSPT_Id, eNSPA_Required },
189  { "description", eNSPT_Str, eNSPA_Optional },
190  { "ip", eNSPT_Str, eNSPA_Optional, "" },
191  { "sid", eNSPT_Str, eNSPA_Optional, "" },
192  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
193  /* QDEL checks 'program' and 'submit hosts' from the queue it deletes */
195  eNS_NoChecks },
196  { { "qname", eNSPT_Id, eNSPA_Required },
197  { "ip", eNSPT_Str, eNSPA_Optional, "" },
198  { "sid", eNSPT_Str, eNSPA_Optional, "" },
199  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
201  eNS_Queue },
202  { { "job_key", eNSPT_Id, eNSPA_Required },
203  { "ip", eNSPT_Str, eNSPA_Optional, "" },
204  { "sid", eNSPT_Str, eNSPA_Optional, "" },
205  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
206  { "STATUS2", { &CNetScheduleHandler::x_ProcessStatus,
207  eNS_Queue },
208  { { "job_key", eNSPT_Id, eNSPA_Required },
209  { "ip", eNSPT_Str, eNSPA_Optional, "" },
210  { "sid", eNSPT_Str, eNSPA_Optional, "" },
211  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" },
212  { "need_progress_msg", eNSPT_Str, eNSPA_Optional, "0" } } },
213  /* The STAT command makes sense with and without a queue */
215  eNS_NoChecks },
216  { { "option", eNSPT_Id, eNSPA_Optional },
217  { "comment", eNSPT_Id, eNSPA_Optional },
218  { "aff", eNSPT_Str, eNSPA_Optional },
219  { "group", eNSPT_Str, eNSPA_Optional },
220  { "ip", eNSPT_Str, eNSPA_Optional, "" },
221  { "sid", eNSPT_Str, eNSPA_Optional, "" },
222  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
225  { { "job_key", eNSPT_Id, eNSPA_Required },
226  { "progress_msg", eNSPT_Str, eNSPA_Required },
227  { "ip", eNSPT_Str, eNSPA_Optional, "" },
228  { "sid", eNSPT_Str, eNSPA_Optional, "" },
229  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
232  { { "job_key", eNSPT_Id, eNSPA_Required },
233  { "ip", eNSPT_Str, eNSPA_Optional, "" },
234  { "sid", eNSPT_Str, eNSPA_Optional, "" },
235  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
237  eNS_Queue },
238  { { "job_key", eNSPT_Id, eNSPA_Optional },
239  { "status", eNSPT_Str, eNSPA_Optional, "" },
240  { "start_after", eNSPT_Id, eNSPA_Optional },
241  { "count", eNSPT_Int, eNSPA_Optional, "0" },
242  { "group", eNSPT_Str, eNSPA_Optional, "" },
243  { "aff", eNSPT_Str, eNSPA_Optional, "" },
244  { "ip", eNSPT_Str, eNSPA_Optional, "" },
245  { "sid", eNSPT_Str, eNSPA_Optional, "" },
246  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" },
247  { "fields", eNSPT_Str, eNSPA_Optional, "" },
248  { "order", eNSPT_Str, eNSPA_Optional, "first" } } },
250  eNS_Queue },
251  { { "ip", eNSPT_Str, eNSPA_Optional, "" },
252  { "sid", eNSPT_Str, eNSPA_Optional, "" },
253  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
255  eNS_Queue },
256  { { "ip", eNSPT_Str, eNSPA_Optional, "" },
257  { "sid", eNSPT_Str, eNSPA_Optional, "" },
258  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
260  eNS_Queue },
261  { { "ip", eNSPT_Str, eNSPA_Optional, "" },
262  { "sid", eNSPT_Str, eNSPA_Optional, "" },
263  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
266  { { "ip", eNSPT_Str, eNSPA_Optional, "" },
267  { "sid", eNSPT_Str, eNSPA_Optional, "" },
268  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
271  { { "ip", eNSPT_Str, eNSPA_Optional, "" },
272  { "sid", eNSPT_Str, eNSPA_Optional, "" },
273  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
276  { { "job_key", eNSPT_Id, eNSPA_Required },
277  { "ip", eNSPT_Str, eNSPA_Optional, "" },
278  { "sid", eNSPT_Str, eNSPA_Optional, "" },
279  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
282  { { "job_key", eNSPT_Id, eNSPA_Required },
283  { "ip", eNSPT_Str, eNSPA_Optional, "" },
284  { "sid", eNSPT_Str, eNSPA_Optional, "" },
285  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" },
286  { "need_progress_msg", eNSPT_Str, eNSPA_Optional, "0" } } },
289  { { "input", eNSPT_Str, eNSPA_Required },
290  { "port", eNSPT_Int, eNSPA_Optional },
291  { "timeout", eNSPT_Int, eNSPA_Optional },
292  { "aff", eNSPT_Str, eNSPA_Optional, "" },
293  { "msk", eNSPT_Int, eNSPA_Optional, "0" },
294  { "ip", eNSPT_Str, eNSPA_Optional, "" },
295  { "sid", eNSPT_Str, eNSPA_Optional, "" },
296  { "group", eNSPT_Str, eNSPA_Optional, "" },
297  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" },
298  { "need_progress_msg", eNSPT_Str, eNSPA_Optional, "0" } } },
301  { { "job_key", eNSPT_Id, eNSPA_Optional },
302  { "group", eNSPT_Str, eNSPA_Optional, "" },
303  { "aff", eNSPT_Str, eNSPA_Optional, "" },
304  { "status", eNSPT_Str, eNSPA_Optional, "" },
305  { "ip", eNSPT_Str, eNSPA_Optional, "" },
306  { "sid", eNSPT_Str, eNSPA_Optional, "" },
307  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
310  { { "job_key", eNSPT_Id, eNSPA_Required },
311  { "port", eNSPT_Int, eNSPA_Required },
312  { "timeout", eNSPT_Int, eNSPA_Required },
313  { "ip", eNSPT_Str, eNSPA_Optional, "" },
314  { "sid", eNSPT_Str, eNSPA_Optional, "" },
315  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" },
316  { "need_stolen", eNSPT_Str, eNSPA_Optional, "0" },
317  { "need_progress_msg", eNSPT_Str, eNSPA_Optional, "0" } } },
320  { { "port", eNSPT_Int, eNSPA_Optional },
321  { "timeout", eNSPT_Int, eNSPA_Optional },
322  { "ip", eNSPT_Str, eNSPA_Optional, "" },
323  { "sid", eNSPT_Str, eNSPA_Optional, "" },
324  { "group", eNSPT_Str, eNSPA_Optional, "" },
325  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
328  { { "aff", eNSPT_Str, eNSPA_Optional, "" },
329  { "port", eNSPT_Int, eNSPA_Optional },
330  { "timeout", eNSPT_Int, eNSPA_Optional, },
331  { "group", eNSPT_Str, eNSPA_Optional, "" },
332  { "ip", eNSPT_Str, eNSPA_Optional, "" },
333  { "sid", eNSPT_Str, eNSPA_Optional, "" },
334  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" },
335  { "affinity_may_change",
336  eNSPT_Int, eNSPA_Optional, "0" },
337  { "group_may_change", eNSPT_Int, eNSPA_Optional, "0" } } },
340  { { "reader_aff", eNSPT_Int, eNSPA_Required, "0" },
341  { "any_aff", eNSPT_Int, eNSPA_Required, "0" },
342  { "exclusive_new_aff", eNSPT_Int, eNSPA_Optional, "0" },
343  { "aff", eNSPT_Str, eNSPA_Optional, "" },
344  { "port", eNSPT_Int, eNSPA_Optional },
345  { "timeout", eNSPT_Int, eNSPA_Optional, },
346  { "group", eNSPT_Str, eNSPA_Optional, "" },
347  { "ip", eNSPT_Str, eNSPA_Optional, "" },
348  { "sid", eNSPT_Str, eNSPA_Optional, "" },
349  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" },
350  { "affinity_may_change",
351  eNSPT_Int, eNSPA_Optional, "0" },
352  { "group_may_change", eNSPT_Int, eNSPA_Optional, "0" },
353  { "prioritized_aff", eNSPT_Int, eNSPA_Optional, "0" } } },
356  { { "job_key", eNSPT_Id, eNSPA_Required },
357  { "auth_token", eNSPT_Id, eNSPA_Required },
358  { "ip", eNSPT_Str, eNSPA_Optional, "" },
359  { "sid", eNSPT_Str, eNSPA_Optional, "" },
360  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
363  { { "job_key", eNSPT_Id, eNSPA_Required },
364  { "auth_token", eNSPT_Id, eNSPA_Required },
365  { "err_msg", eNSPT_Str, eNSPA_Optional },
366  { "ip", eNSPT_Str, eNSPA_Optional, "" },
367  { "sid", eNSPT_Str, eNSPA_Optional, "" },
368  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" },
369  { "no_retries", eNSPT_Int, eNSPA_Optional, "0" } } },
372  { { "job_key", eNSPT_Id, eNSPA_Required },
373  { "auth_token", eNSPT_Str, eNSPA_Required },
374  { "ip", eNSPT_Str, eNSPA_Optional, "" },
375  { "sid", eNSPT_Str, eNSPA_Optional, "" },
376  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" },
377  { "blacklist", eNSPT_Int, eNSPA_Optional, "1" } } },
380  { { "job_key", eNSPT_Id, eNSPA_Required },
381  { "ip", eNSPT_Str, eNSPA_Optional, "" },
382  { "sid", eNSPT_Str, eNSPA_Optional, "" },
383  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
385  eNS_Queue },
386  { { "job_key", eNSPT_Id, eNSPA_Required },
387  { "ip", eNSPT_Str, eNSPA_Optional, "" },
388  { "sid", eNSPT_Str, eNSPA_Optional, "" },
389  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
391  eNS_Queue },
392  { { "job_key", eNSPT_Id, eNSPA_Required },
393  { "ip", eNSPT_Str, eNSPA_Optional, "" },
394  { "sid", eNSPT_Str, eNSPA_Optional, "" },
395  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" },
396  { "need_progress_msg", eNSPT_Str, eNSPA_Optional, "0" } } },
399  { { "add", eNSPT_Str, eNSPA_Optional, "" },
400  { "del", eNSPT_Str, eNSPA_Optional, "" },
401  { "ip", eNSPT_Str, eNSPA_Optional, "" },
402  { "sid", eNSPT_Str, eNSPA_Optional, "" },
403  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
406  { { "add", eNSPT_Str, eNSPA_Optional, "" },
407  { "del", eNSPT_Str, eNSPA_Optional, "" },
408  { "ip", eNSPT_Str, eNSPA_Optional, "" },
409  { "sid", eNSPT_Str, eNSPA_Optional, "" },
410  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
413  { { "aff", eNSPT_Str, eNSPA_Optional, "" },
414  { "ip", eNSPT_Str, eNSPA_Optional, "" },
415  { "sid", eNSPT_Str, eNSPA_Optional, "" },
416  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
419  { { "aff", eNSPT_Str, eNSPA_Optional, "" },
420  { "ip", eNSPT_Str, eNSPA_Optional, "" },
421  { "sid", eNSPT_Str, eNSPA_Optional, "" },
422  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
425  { { "port", eNSPT_Id, eNSPA_Optional },
426  { "aff", eNSPT_Str, eNSPA_Optional, "" },
427  { "ip", eNSPT_Str, eNSPA_Optional, "" },
428  { "sid", eNSPT_Str, eNSPA_Optional, "" },
429  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
432  { { "wnode_aff", eNSPT_Int, eNSPA_Required, "0" },
433  { "any_aff", eNSPT_Int, eNSPA_Required, "0" },
434  { "exclusive_new_aff", eNSPT_Int, eNSPA_Optional, "0" },
435  { "aff", eNSPT_Str, eNSPA_Optional, "" },
436  { "port", eNSPT_Int, eNSPA_Optional },
437  { "timeout", eNSPT_Int, eNSPA_Optional },
438  { "group", eNSPT_Str, eNSPA_Optional, "" },
439  { "ip", eNSPT_Str, eNSPA_Optional, "" },
440  { "sid", eNSPT_Str, eNSPA_Optional, "" },
441  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" },
442  { "prioritized_aff", eNSPT_Int, eNSPA_Optional, "0" } } },
445  { { "job_key", eNSPT_Id, eNSPA_Required },
446  { "job_return_code", eNSPT_Id, eNSPA_Required },
447  { "output", eNSPT_Str, eNSPA_Required },
448  { "ip", eNSPT_Str, eNSPA_Optional, "" },
449  { "sid", eNSPT_Str, eNSPA_Optional, "" },
450  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
453  { { "job_key", eNSPT_Id, eNSPA_Required },
454  { "auth_token", eNSPT_Id, eNSPA_Required },
455  { "job_return_code", eNSPT_Id, eNSPA_Required },
456  { "output", eNSPT_Str, eNSPA_Required },
457  { "ip", eNSPT_Str, eNSPA_Optional, "" },
458  { "sid", eNSPT_Str, eNSPA_Optional, "" },
459  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
462  { { "job_key", eNSPT_Id, eNSPA_Required },
463  { "ip", eNSPT_Str, eNSPA_Optional, "" },
464  { "sid", eNSPT_Str, eNSPA_Optional, "" },
465  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
466  { "RETURN2", { &CNetScheduleHandler::x_ProcessReturn,
468  { { "job_key", eNSPT_Id, eNSPA_Required },
469  { "auth_token", eNSPT_Id, eNSPA_Required },
470  { "blacklist", eNSPT_Int, eNSPA_Optional, "1" },
471  { "ip", eNSPT_Str, eNSPA_Optional, "" },
472  { "sid", eNSPT_Str, eNSPA_Optional, "" },
473  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
474  { "RESCHEDULE", { &CNetScheduleHandler::x_ProcessReschedule,
476  { { "job_key", eNSPT_Id, eNSPA_Required },
477  { "auth_token", eNSPT_Id, eNSPA_Required },
478  { "aff", eNSPT_Str, eNSPA_Optional, "" },
479  { "group", eNSPT_Str, eNSPA_Optional, "" },
480  { "ip", eNSPT_Str, eNSPA_Optional, "" },
481  { "sid", eNSPT_Str, eNSPA_Optional, "" },
482  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
485  { { "job_key", eNSPT_Id, eNSPA_Required },
486  { "ip", eNSPT_Str, eNSPA_Optional, "" },
487  { "sid", eNSPT_Str, eNSPA_Optional, "" },
488  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
491  { { "port", eNSPT_Int, eNSPA_Required },
492  { "timeout", eNSPT_Int, eNSPA_Required },
493  { "aff", eNSPT_Str, eNSPA_Optional, "" },
494  { "ip", eNSPT_Str, eNSPA_Optional, "" },
495  { "sid", eNSPT_Str, eNSPA_Optional, "" },
496  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
499  { { "ip", eNSPT_Str, eNSPA_Optional, "" },
500  { "sid", eNSPT_Str, eNSPA_Optional, "" },
501  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
504  { { "ip", eNSPT_Str, eNSPA_Optional, "" },
505  { "sid", eNSPT_Str, eNSPA_Optional, "" },
506  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
509  { { "job_key", eNSPT_Id, eNSPA_Required },
510  { "err_msg", eNSPT_Str, eNSPA_Required },
511  { "output", eNSPT_Str, eNSPA_Required },
512  { "job_return_code", eNSPT_Int, eNSPA_Required },
513  { "ip", eNSPT_Str, eNSPA_Optional, "" },
514  { "sid", eNSPT_Str, eNSPA_Optional, "" },
515  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
518  { { "job_key", eNSPT_Id, eNSPA_Required },
519  { "auth_token", eNSPT_Id, eNSPA_Required },
520  { "err_msg", eNSPT_Str, eNSPA_Required },
521  { "output", eNSPT_Str, eNSPA_Required },
522  { "job_return_code", eNSPT_Int, eNSPA_Required },
523  { "ip", eNSPT_Str, eNSPA_Optional, "" },
524  { "sid", eNSPT_Str, eNSPA_Optional, "" },
525  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" },
526  { "no_retries", eNSPT_Int, eNSPA_Optional, "0" } } },
529  { { "job_key", eNSPT_Id, eNSPA_Optional },
530  { "job_return_code", eNSPT_Int, eNSPA_Optional },
531  { "output", eNSPT_Str, eNSPA_Optional },
532  { "aff", eNSPT_Str, eNSPA_Optional, "" },
533  { "ip", eNSPT_Str, eNSPA_Optional, "" },
534  { "sid", eNSPT_Str, eNSPA_Optional, "" },
535  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
538  { { "job_key", eNSPT_Id, eNSPA_Required },
539  { "timeout", eNSPT_Int, eNSPA_Required },
540  { "ip", eNSPT_Str, eNSPA_Optional, "" },
541  { "sid", eNSPT_Str, eNSPA_Optional, "" },
542  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
545  { { "job_key", eNSPT_Id, eNSPA_Required },
546  { "timeout", eNSPT_Int, eNSPA_Required },
547  { "ip", eNSPT_Str, eNSPA_Optional, "" },
548  { "sid", eNSPT_Str, eNSPA_Optional, "" },
549  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
550 
551  { "SETCLIENTDATA", { &CNetScheduleHandler::x_ProcessSetClientData,
552  eNS_Queue },
553  { { "data", eNSPT_Str, eNSPA_Required },
554  { "version", eNSPT_Int, eNSPA_Optional, "-1" },
555  { "ip", eNSPT_Str, eNSPA_Optional, "" },
556  { "sid", eNSPT_Str, eNSPA_Optional, "" },
557  { "ncbi_phid", eNSPT_Str, eNSPA_Optional, "" } } },
558 
559  // Obsolete commands
561  eNS_NoChecks } },
563  eNS_NoChecks } },
565  eNS_NoChecks } },
567  eNS_NoChecks } },
569  eNS_NoChecks } },
570  { NULL },
571 };
572 
573 
575  { "client", eNSPT_Str, eNSPA_Optional, "Unknown client" },
576  { "params", eNSPT_Str, eNSPA_Ellipsis },
577  { NULL }
578 };
579 
580 
581 
583 {}
584 
586 {
588 }
589 
590 
591 
592 static size_t s_BufReadHelper(void* data, const void* ptr, size_t size)
593 {
594  ((string*) data)->append((const char *) ptr, size);
595  return size;
596 }
597 
598 
599 static void s_ReadBufToString(BUF buf, string& str)
600 {
601  size_t size = BUF_Size(buf);
602 
603  str.erase();
604  str.reserve(size);
605 
607  BUF_Read(buf, NULL, size);
608 }
609 
610 
612  : m_MsgBufferSize(kInitialMessageBufferSize),
613  m_MsgBuffer(new char[kInitialMessageBufferSize]),
614  m_Server(server),
615  m_ProcessMessage(NULL),
616  m_BatchSize(0),
617  m_BatchPos(0),
618  m_BatchSubmPort(0),
619  m_WithinBatchSubmit(false),
620  m_SingleCmdParser(sm_CommandMap),
621  m_BatchHeaderParser(sm_BatchHeaderMap),
622  m_BatchEndParser(sm_BatchEndMap),
623  m_ClientIdentificationPrinted(false),
624  m_RollbackAction(NULL)
625 {}
626 
627 
629 {
631  delete [] m_MsgBuffer;
632 }
633 
634 
636 {
637  CSocket & socket = GetSocket();
639 
640  socket.DisableOSSendDelay();
641  socket.SetTimeout(eIO_ReadWrite, &to);
643 
645 
646  // Log the fact of opened connection if needed
647  if (m_Server->IsLog()) {
648  CRequestContextResetter context_resetter;
650  }
651 }
652 
653 
655 {}
656 
657 
659 {
660  if (m_WithinBatchSubmit) {
661  m_WithinBatchSubmit = false;
663  }
664 
665  // It's possible that this method will be called before OnOpen - when
666  // connection is just created and server is shutting down. In this case
667  // OnOpen will never be called.
668  //
669  // m_ConnContext != NULL also tells that the logging is required
670  if (m_ConnContext.IsNull())
671  return;
672 
673  switch (peer)
674  {
676  // All the places where the server closes the connection make sure
677  // that the conn and cmd request statuses are set approprietly
678  break;
680  // All the commands are synchronous so there is no need to set the
681  // request status here.
682  break;
683  }
684 
685 
686  // If a command has not finished its logging by some reasons - do it
687  // here as the last chance.
688  if (m_CmdContext.NotNull()) {
693  }
694 
695  CSocket & socket = GetSocket();
696  TNCBI_BigCount read_count = socket.GetCount(eIO_Read);
697  TNCBI_BigCount write_count = socket.GetCount(eIO_Write);
698 
699  m_ConnContext->SetBytesRd(read_count);
700  m_ConnContext->SetBytesWr(write_count);
701 
702  // Call the socket shutdown only if it was a client close and
703  // there was some data exchange over the connection.
704  // LBSMD - when it checks a connection point - opens and aborts the
705  // connection without any data transferring and in this scenario the
706  // socket.Shutdown() call returns non success.
708  (read_count > 0 || write_count > 0)) {
709 
710  // The socket.Shutdown() call will fail if there is some not delivered
711  // data in the socket or if the connection was not closed properly on
712  // the client side.
714  if (status != EIO_Status::eIO_Success) {
716  ERR_POST("Unseccessfull client socket shutdown. "
717  "The socket may have data not delivered to the client. "
718  "Error code: " << status << ": " << IO_StatusStr(status));
719  }
720  }
721 
726 }
727 
728 
730 {
731  CRequestContextResetter context_resetter;
733 
734  if (m_ConnContext.NotNull())
736 }
737 
738 
740 {
741  CRequestContextResetter context_resetter;
743 
744  switch (reason) {
746  ERR_POST("eCommunicationError:Connection pool full");
747  break;
749  ERR_POST("eCommunicationError:Unpollable connection");
750  break;
752  ERR_POST("eCommunicationError:Request queue full");
753  break;
754  default:
755  ERR_POST("eCommunicationError:Unknown overflow error");
756  break;
757  }
758 }
759 
760 
762 {
763  CRequestContextResetter context_resetter;
765 
766  if (m_Server->ShutdownRequested()) {
767  ERR_POST("NetSchedule is shutting down. Client input rejected.");
769  if (x_WriteMessage("ERR:eShuttingDown:NetSchedule server "
770  "is shutting down. Session aborted." +
773  return;
774  }
775 
776  bool error = true;
777  string error_client_message;
778  unsigned int error_code;
779 
780  try {
781  // Single line user input processor
782  (this->*m_ProcessMessage)(buffer);
783  error = false;
784  }
785  catch (const CNetScheduleException & ex) {
787  ERR_POST(ex);
788  if (m_CmdContext.NotNull())
790  else if (m_ConnContext.NotNull())
792 
793  if (x_WriteMessage("ERR:" + string(ex.GetErrCodeString()) +
794  ":" + ex.GetMsg() +
797  return;
798  }
799  ERR_POST(ex);
800  error_client_message = "ERR:" + string(ex.GetErrCodeString()) +
801  ":" + ex.GetMsg();
802  error_code = ex.ErrCodeToHTTPStatusCode();
803  }
804  catch (const CNSProtoParserException & ex) {
805  ERR_POST(ex);
806  error_client_message = "ERR:" + string(ex.GetErrCodeString()) +
807  ":" + ex.GetMsg();
808  error_code = eStatus_BadRequest;
809  }
810  catch (const CNetServiceException & ex) {
811  ERR_POST(ex);
812  error_client_message = "ERR:" + string(ex.GetErrCodeString()) +
813  ":" + ex.GetMsg();
815  error_code = eStatus_SocketIOError;
816  else
817  error_code = eStatus_ServerError;
818  }
819  catch (const exception & ex) {
820  ERR_POST("STL exception: " << ex.what());
821  error_client_message = "ERR:" +
822  NStr::PrintableString("eInternalError:Internal "
823  "error - " + string(ex.what()));
824  error_code = eStatus_ServerError;
825  }
826  catch (...) {
827  ERR_POST("ERR:Unknown server exception.");
828  error_client_message = "ERR:eInternalError:Unknown server exception.";
829  error_code = eStatus_ServerError;
830  }
831 
832  if (error) {
833  x_SetCmdRequestStatus(error_code);
834  x_WriteMessage(error_client_message + kEndOfResponse);
836  }
837 }
838 
839 
840 void CNetScheduleHandler::OnError(const string & err_message)
841 {
842  ERR_POST(Warning << err_message);
843 }
844 
845 
847 {
848  int fd = 0;
849  int val = 1;
850 
851  GetSocket().GetOSHandle(&fd, sizeof(fd));
852  setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &val, sizeof(val));
853 }
854 
855 
856 
859  size_t msg_size,
860  size_t required_size)
861 {
862  if (required_size > m_MsgBufferSize) {
863  delete [] m_MsgBuffer;
864  while (required_size > m_MsgBufferSize)
866  m_MsgBuffer = new char[m_MsgBufferSize];
867  }
868 
869  memcpy(m_MsgBuffer, msg.data(), msg_size);
870  m_MsgBuffer[required_size-1] = '\n';
871 
872  #if defined(_DEBUG) && !defined(NDEBUG)
873  if (m_ConnContext.NotNull()) {
875  if (err_emul.IsActive()) {
876  if (err_emul.as_double > 0.0) {
877  CNSPreciseTime delay(err_emul.as_double);
878 
879  // Non signal safe but good enough for error simulation
880  nanosleep(&delay, NULL);
881  }
882  }
883 
885  if (err_emul.IsActive()) {
886  if (err_emul.as_bool) {
888  return eIO_Closed;
889  }
890  }
891 
892  err_emul = m_Server->GetDebugReplyWithGarbage();
893  if (err_emul.IsActive()) {
894  if (err_emul.as_bool) {
895  string value = m_Server->GetDebugGarbage();
896 
897  msg_size = value.size();
898  while (msg_size >= 1 && msg[msg_size-1] == '\n')
899  --msg_size;
900  required_size = msg_size + 1;
901 
902  if (required_size > m_MsgBufferSize) {
903  delete [] m_MsgBuffer;
904  while (required_size > m_MsgBufferSize)
906  m_MsgBuffer = new char[m_MsgBufferSize];
907  }
908 
909  memcpy(m_MsgBuffer, value.c_str(), msg_size);
910  m_MsgBuffer[required_size-1] = '\n';
911  }
912  }
913  }
914  #endif
915 
916  return eIO_Success;
917 }
918 
919 
921 {
922  size_t msg_size = msg.size();
923  bool has_eom = false;
924 
925  while (msg_size >= 1 && msg[msg_size-1] == '\n') {
926  --msg_size;
927  has_eom = true;
928  }
929 
930  size_t required_size = msg_size + 1;
931  const char * msg_buf = NULL;
932 
933  if (has_eom)
934  msg_buf = msg.data();
935  else {
936  EIO_Status status = x_PrepareWriteBuffer(msg, msg_size, required_size);
937  if (status != eIO_Success)
938  return status;
939  msg_buf = m_MsgBuffer;
940  }
941 
942  // Write to the socket as a single transaction
943  size_t written;
944  CNSPreciseTime write_start = CNSPreciseTime::Current();
945  EIO_Status result = GetSocket().Write(msg_buf, required_size, &written);
946  if (result == eIO_Success) {
947  #if defined(_DEBUG) && !defined(NDEBUG)
948  if (m_ConnContext.NotNull()) {
949  SErrorEmulatorParameter err_emul =
951  if (err_emul.IsActive()) {
952  if (err_emul.as_bool) {
954  return eIO_Closed;
955  }
956  }
957  }
958  #endif
959  } else {
960  CNSPreciseTime write_timing = CNSPreciseTime::Current() - write_start;
961  x_HandleSocketErrorOnResponse(msg_buf, result, written, write_timing);
962  }
963  return result;
964 }
965 
966 
968  const string & msg,
969  EIO_Status write_result,
970  size_t written_bytes,
971  const CNSPreciseTime & timing)
972 {
973  // There was an error of writing into a socket, so rollback the action if
974  // necessary and close the connection
975  string report =
976  "Error writing message to the client. "
977  "Peer: " + GetSocket().GetPeerAddress() + ". "
978  "Socket write error status: " + IO_StatusStr(write_result) + ". "
979  "Written bytes: " + to_string(written_bytes) + ". "
980  "Socket write timing: " + to_string(double(timing)) + ". "
981  "Message begins with: ";
982  if (msg.size() > 32)
983  report += msg.substr(0, 32) + " (TRUNCATED)";
984  else
985  report += msg;
986  ERR_POST(report);
987 
988  if (m_ConnContext.NotNull()) {
991  if (m_CmdContext.NotNull()) {
994  }
995  }
996 
997  try {
998  if (!m_QueueName.empty()) {
999  CRef<CQueue> ref = GetQueue();
1000  ref->RegisterSocketWriteError(m_ClientId);
1002  }
1003  } catch (...) {}
1004 
1006 }
1007 
1008 
1010 {
1011  unsigned int peer_addr;
1012 
1013  GetSocket().GetPeerAddress(&peer_addr, 0, eNH_NetworkByteOrder);
1014 
1015  // always use localhost(127.0*) address for clients coming from
1016  // the same net address (sometimes it can be 127.* or full address)
1017  if (peer_addr == m_Server->GetHostNetAddr())
1019  return peer_addr;
1020 }
1021 
1022 
1024 {
1025  // This should only memorize the received string.
1026  // The x_ProcessMsgQueue(...) will parse it.
1027  // This is done to avoid copying parsed parameters and to have exactly one
1028  // diagnostics extra with both auth parameters and queue name.
1030 
1031  // Check if it was systems probe...
1032  if (strncmp(m_RawAuthString.c_str(), "GET / HTTP/1.", 13) == 0) {
1033  // That was systems probing ports
1034 
1037  return;
1038  }
1039 
1042 }
1043 
1044 
1046 {
1048 
1049  // Parse saved raw authorization string and produce log output
1050  TNSProtoParams params;
1051 
1052  try {
1054  }
1055  catch (CNSProtoParserException & ex) {
1056  string msg = "Error authenticating client: '";
1057  if (m_RawAuthString.size() > 128)
1058  msg += string(m_RawAuthString.c_str(), 128) + " (TRUNCATED)";
1059  else
1060  msg += m_RawAuthString;
1061  msg += "': ";
1062 
1063  // This will form request context with the client IP etc.
1064  CRequestContextResetter context_resetter;
1065  if (m_ConnContext.IsNull())
1067 
1068  // ex.what() is here to avoid unnecessery records in the log
1069  // if it is simple 'ex' -> 2 records are produced
1070  ERR_POST(msg << ex.what());
1071 
1074  return;
1075  }
1076 
1077  // Memorize what we know about the client
1078  m_ClientId.Update(this->x_GetPeerAddress(), params);
1079 
1080 
1081  // Test if it is an administrative user and memorize it
1085 
1086  // Produce the log output if required
1087  if (x_NeedCmdLogging()) {
1088  CDiagContext_Extra diag_extra = GetDiagContext().Extra();
1089  diag_extra.Print("queue", m_QueueName);
1090  for (const auto & param : params) {
1091  if (param.first == "status") {
1092  diag_extra.Print("job_status", param.second);
1093  } else {
1094  diag_extra.Print(param.first, param.second);
1095  }
1096  }
1097  }
1098 
1099  // Empty queue name is a synonim for hardcoded 'noname'.
1100  // To have exactly one string comparison, make the name empty if 'noname'
1101  if (NStr::CompareNocase(m_QueueName, "noname") == 0)
1102  m_QueueName = "";
1103 
1104  if (!m_QueueName.empty()) {
1105  CRef<CQueue> queue;
1106  try {
1107  queue = m_Server->OpenQueue(m_QueueName);
1108  m_QueueRef.Reset(queue);
1109  }
1110  catch (const CNetScheduleException & ex) {
1112  // This will form request context with the client IP etc.
1113  CRequestContextResetter context_resetter;
1114  if (m_ConnContext.IsNull())
1116 
1117  ERR_POST(ex);
1119  if (x_WriteMessage("ERR:" + string(ex.GetErrCodeString()) +
1120  ":" + ex.GetMsg() +
1123  return;
1124  }
1125  throw;
1126  }
1127 
1129  }
1130  else
1132 
1135 }
1136 
1137 
1138 // Workhorse method
1140 {
1141  if (x_NeedCmdLogging()) {
1146  }
1147 
1148  SParsedCmd cmd;
1149  string msg;
1150  try {
1151  s_ReadBufToString(buffer, msg);
1153 
1154  // It throws an exception if the input is not valid
1156  cmd.params,
1157  cmd.command->cmd,
1158  x_NeedToGeneratePHIDAndSID(cmd.command->extra.processor),
1159  GetSocket(),
1161 
1163  }
1164  catch (const CNSProtoParserException & ex) {
1165  // This is the exception from m_SingleCmdParser.ParseCommand(msg)
1166 
1167  // Parsing is done before PrintRequestStart(...) so a diag context is
1168  // not created here - create it just to provide an error output
1169  x_OnCmdParserError(true, ex.GetMsg(), "");
1170  return;
1171  }
1172  catch (const CNetScheduleException & ex) {
1173  // This is an exception from AssignValues(...)
1174 
1175  // That is, the print request has not been printed yet
1177  throw;
1178  }
1179 
1180  const SCommandExtra & extra = cmd.command->extra;
1181 
1184  return;
1185  }
1186 
1187 
1188  // If the command requires queue, hold a hard reference to this
1189  // queue from a weak one (m_Queue) in queue_ref, and take C pointer
1190  // into queue_ptr. Otherwise queue_ptr is NULL, which is OK for
1191  // commands which does not require a queue.
1192  CRef<CQueue> queue_ref;
1193  CQueue * queue_ptr = NULL;
1194 
1195  bool restore_client = false;
1196  TNSCommandChecks orig_client_passed_checks = 0;
1197  unsigned int orig_client_id = 0;
1198 
1199  if (extra.checks & eNS_Queue) {
1200  if (x_CanBeWithoutQueue(extra.processor) &&
1202  // This command must use queue name from the job key
1203  queue_ref.Reset(
1206  queue_ptr = queue_ref.GetPointer();
1207 
1210  orig_client_passed_checks = m_ClientId.GetPassedChecks();
1211  orig_client_id = m_ClientId.GetID();
1212  restore_client = true;
1213 
1214  x_UpdateClientPassedChecks(queue_ptr);
1215  }
1216  } else {
1217  // Old fasion way - the queue comes from handshake
1218  if (m_QueueName.empty())
1219  NCBI_THROW(CNetScheduleException, eUnknownQueue,
1220  "Job queue is required");
1221  queue_ref.Reset(GetQueue());
1222  queue_ptr = queue_ref.GetPointer();
1223  }
1224  }
1227  if (!m_QueueName.empty()) {
1228  // The STAT and REFUSESUBMITS commands
1229  // could be with or without a queue
1230  queue_ref.Reset(GetQueue());
1231  queue_ptr = queue_ref.GetPointer();
1232  }
1233  }
1234 
1235  m_ClientId.CheckAccess(extra.checks, m_Server, cmd.command->cmd);
1236 
1238  if (queue_ptr) {
1239  bool client_was_found = false;
1240  bool session_was_reset = false;
1241  string old_session;
1242  bool had_wn_pref_affs = false;
1243  bool had_reader_pref_affs = false;
1244 
1245  // The cient has a queue, so memorize the client
1246  queue_ptr->TouchClientsRegistry(m_ClientId, client_was_found,
1247  session_was_reset, old_session,
1248  had_wn_pref_affs, had_reader_pref_affs);
1249  if (client_was_found && session_was_reset) {
1250  if (x_NeedCmdLogging()) {
1251  string wn_val = "true";
1252  if (!had_wn_pref_affs)
1253  wn_val = "had none";
1254  string reader_val = "true";
1255  if (!had_reader_pref_affs)
1256  reader_val = "had none";
1257 
1258  GetDiagContext().Extra()
1259  .Print("client_node", m_ClientId.GetNode())
1260  .Print("client_session", m_ClientId.GetSession())
1261  .Print("client_old_session", old_session)
1262  .Print("wn_preferred_affinities_reset", wn_val)
1263  .Print("reader_preferred_affinities_reset", reader_val);
1264 
1266  }
1267  }
1268  }
1269 
1270  // Execute the command
1271  (this->*extra.processor)(queue_ptr);
1272 
1273  if (restore_client) {
1274  m_ClientId.SetPassedChecks(orig_client_passed_checks);
1275  m_ClientId.SetID(orig_client_id);
1276  }
1277 }
1278 
1279 
1281 {
1282  // Admin flag is not reset because it comes from a handshake stage and
1283  // cannot be changed later.
1285 
1286  // First, deal with a queue
1287  if (q == NULL)
1288  return;
1289 
1291  if (!m_ClientId.IsAdmin()) {
1292  // Admin can do everything, so there is no need to check
1293  // the hosts and programs for non-admins only
1302  }
1303 
1304  // Also, update the client scope
1306 }
1307 
1308 
1309 // Message processors for x_ProcessSubmitBatch
1311 {
1312  // Expecting BTCH size | ENDS
1313  try {
1314  string msg;
1315  s_ReadBufToString(buffer, msg);
1316 
1318  CTempString size_str = cmd.params["size"];
1319 
1320  if (!size_str.empty())
1321  m_BatchSize = NStr::StringToInt(size_str);
1322  else
1323  m_BatchSize = 0;
1324  (this->*cmd.command->extra.processor)(0);
1325  }
1326  catch (const CNSProtoParserException & ex) {
1327  m_WithinBatchSubmit = false;
1329  x_OnCmdParserError(false, ex.GetMsg(), ", BTCH or ENDS expected");
1330  return;
1331  }
1332  catch (const CNetScheduleException & ex) {
1333  m_WithinBatchSubmit = false;
1335  ERR_POST("Server error: " << ex);
1336  x_SetCmdRequestStatus(ex.ErrCodeToHTTPStatusCode());
1337  x_WriteMessage("ERR:" + string(ex.GetErrCodeString()) +
1338  ":" + ex.GetMsg() + kEndOfResponse);
1340 
1341  m_BatchJobs.clear();
1343  }
1344  catch (const CException & ex) {
1345  m_WithinBatchSubmit = false;
1347  ERR_POST("Error processing command: " << ex);
1349  x_WriteMessage("ERR:eProtocolSyntaxError:"
1350  "Error processing BTCH or ENDS." + kEndOfResponse);
1352 
1353  m_BatchJobs.clear();
1355  }
1356  catch (...) {
1357  m_WithinBatchSubmit = false;
1359  ERR_POST("Unknown error while expecting BTCH or ENDS");
1361  x_WriteMessage("ERR:eInternalError:Unknown error "
1362  "while expecting BTCH or ENDS." + kEndOfResponse);
1364 
1365  m_BatchJobs.clear();
1367  }
1368 }
1369 
1370 
1372 {
1373  // Expecting:
1374  // "input" [aff="affinity_token"] [msk=1]
1375  string msg;
1376  s_ReadBufToString(buffer, msg);
1377 
1378  CJob & job = m_BatchJobs[m_BatchPos].first;
1379  TNSProtoParams params;
1380  try {
1382  m_CommandArguments.AssignValues(params, "", false, GetSocket(),
1384  }
1385  catch (const CNSProtoParserException & ex) {
1386  m_WithinBatchSubmit = false;
1388  x_OnCmdParserError(false, ex.GetMsg(), "");
1389  return;
1390  }
1391  catch (const CNetScheduleException & ex) {
1392  m_WithinBatchSubmit = false;
1394  ERR_POST(ex.GetMsg());
1395  x_SetCmdRequestStatus(ex.ErrCodeToHTTPStatusCode());
1396  x_WriteMessage("ERR:" + string(ex.GetErrCodeString()) +
1397  ":" + ex.GetMsg() + kEndOfResponse);
1399 
1400  m_BatchJobs.clear();
1402  return;
1403  }
1404  catch (const CException & ex) {
1405  m_WithinBatchSubmit = false;
1407  ERR_POST("Error processing command: " << ex);
1409  x_WriteMessage("ERR:eProtocolSyntaxError:"
1410  "Invalid batch submission, syntax error" +
1411  kEndOfResponse);
1413 
1414  m_BatchJobs.clear();
1416  return;
1417  }
1418  catch (...) {
1419  m_WithinBatchSubmit = false;
1421  ERR_POST("Arguments parsing unknown exception. "
1422  "Batch submit is rejected.");
1424  x_WriteMessage("ERR:eProtocolSyntaxError:"
1425  "Arguments parsing unknown exception" +
1426  kEndOfResponse);
1428 
1429  m_BatchJobs.clear();
1431  return;
1432  }
1433 
1435 
1436  // See CXX-8843: no affinity is replaced with "-" affinity automatically
1437  if (m_CommandArguments.affinity_token.empty())
1439 
1441 
1448 
1449  if (++m_BatchPos >= m_BatchSize)
1451 }
1452 
1453 
1455 {
1456  // See CXX-8253
1457  return GetDiagContext().GetStringUID() + "_" +
1458  to_string(m_ConnContext->GetRequestID());
1459 }
1460 
1461 
1463 {
1464  // Expecting ENDB
1465  try {
1466  string msg;
1467  s_ReadBufToString(buffer, msg);
1469  }
1470  catch (const CNSProtoParserException & ex) {
1471  m_WithinBatchSubmit = false;
1473  x_OnCmdParserError(false, ex.GetMsg(), ", ENDB expected");
1474  return;
1475  }
1476  catch (const CException & ex) {
1477  m_WithinBatchSubmit = false;
1480  ERR_POST("Error processing command: " << ex);
1482  x_WriteMessage("ERR:eProtocolSyntaxError:"
1483  "Batch submit error - unexpected end of batch" +
1484  kEndOfResponse);
1486 
1487  m_BatchJobs.clear();
1489  return;
1490  }
1491  catch (...) {
1492  m_WithinBatchSubmit = false;
1494  ERR_POST("Unknown error while expecting ENDB.");
1496  x_WriteMessage("ERR:eInternalError:"
1497  "Unknown error while expecting ENDB." +
1498  kEndOfResponse);
1500 
1501  m_BatchJobs.clear();
1503  return;
1504  }
1505 
1506  double comm_elapsed = m_BatchStopWatch.Elapsed();
1507 
1508  // BTCH logging is in a separate context
1510  try {
1511  if (x_NeedCmdLogging()) {
1513  ctx->SetRequestID();
1516  .Print("_type", "cmd")
1517  .Print("conn", x_GetConnRef())
1518  .Print("_queue", m_QueueName)
1519  .Print("bsub", m_CmdContext->GetRequestID())
1520  .Print("cmd", "BTCH")
1521  .Print("size", m_BatchJobs.size());
1522  ctx->SetRequestStatus(CNetScheduleHandler::eStatus_OK);
1523  }
1524 
1525  // we have our batch now
1528  unsigned job_id = GetQueue()->SubmitBatch(m_ClientId,
1529  m_BatchJobs,
1530  m_BatchGroup,
1531  x_NeedCmdLogging(),
1533  double db_elapsed = sw.Elapsed();
1534 
1535  if (x_NeedCmdLogging())
1536  GetDiagContext().Extra()
1537  .Print("start_job", job_id)
1538  .Print("commit_time",
1539  NStr::DoubleToString(comm_elapsed, 4,
1541  .Print("transaction_time",
1542  NStr::DoubleToString(db_elapsed, 4,
1544 
1545  x_WriteMessage("OK:" + to_string(job_id) + " " +
1546  m_Server->GetHost().c_str() + " " +
1547  to_string(unsigned(m_Server->GetPort())) +
1548  kEndOfResponse);
1550  }
1551  catch (const CNetScheduleException & ex) {
1552  m_WithinBatchSubmit = false;
1554  if (x_NeedCmdLogging()) {
1556  SetRequestStatus(ex.ErrCodeToHTTPStatusCode());
1558  GetDiagContext().SetRequestContext(current_context);
1559  }
1560  m_BatchJobs.clear();
1562  throw;
1563  }
1564  catch (...) {
1565  m_WithinBatchSubmit = false;
1567  if (x_NeedCmdLogging()) {
1569  SetRequestStatus(eStatus_ServerError);
1571  GetDiagContext().SetRequestContext(current_context);
1572  }
1573  m_BatchJobs.clear();
1575  throw;
1576  }
1577 
1578  if (x_NeedCmdLogging()) {
1580  GetDiagContext().SetRequestContext(current_context);
1581  }
1582 
1584 }
1585 
1586 
1587 //////////////////////////////////////////////////////////////////////////
1588 // Process* methods for processing commands
1589 
1591 {
1592  bool cmdv2(m_CommandArguments.cmd == "SST2");
1593  CNSPreciseTime lifetime;
1594  CJob job;
1597  job, &lifetime);
1598 
1599 
1600  if (status == CNetScheduleAPI::eJobNotFound) {
1602  << " for unknown job: "
1605 
1606  if (cmdv2)
1608  else
1610  to_string((int) status) + kEndOfResponse);
1611  } else {
1612  if (cmdv2) {
1613  CQueue::TPauseStatus pause_status = q->GetPauseStatus();
1614  string reply;
1615  reply.reserve(1024);
1616 
1617  reply.append("OK:job_status=")
1618  .append(CNetScheduleAPI::StatusToString(status))
1619  .append("&job_exptime=")
1620  .append(to_string(lifetime.Sec()));
1621 
1622  if (pause_status == CQueue::ePauseWithPullback)
1623  reply.append("&pause=pullback");
1624  else if (pause_status == CQueue::ePauseWithoutPullback)
1625  reply.append("&pause=nopullback");
1626 
1628  reply.append("&msg=")
1629  .append(NStr::URLEncode(job.GetProgressMsg()));
1630 
1631  x_WriteMessage(reply);
1632  }
1633  else
1634  x_WriteMessage(kOKResponsePrefix + to_string((int) status) +
1635  kEndOfResponse);
1636  x_LogCommandWithJob(job);
1637  }
1639 }
1640 
1641 
1643 {
1644  bool cmdv2(m_CommandArguments.cmd == "WST2");
1645  CNSPreciseTime lifetime;
1646  string client_ip;
1647  string client_sid;
1648  string client_phid;
1649  string progress_msg;
1651  client_ip, client_sid,
1652  client_phid, progress_msg,
1653  &lifetime);
1654 
1655 
1656  if (status == CNetScheduleAPI::eJobNotFound) {
1658  << " for unknown job: "
1661 
1662  if (cmdv2)
1664  else
1665  x_WriteMessage(kOKResponsePrefix + to_string((int) status) +
1666  kEndOfResponse);
1667  } else {
1668  if (cmdv2) {
1669  CQueue::TPauseStatus pause_status = q->GetPauseStatus();
1670  string reply;
1671 
1672  reply.reserve(1024);
1673  reply.append("OK:job_status=")
1674  .append(CNetScheduleAPI::StatusToString(status))
1675  .append("&job_exptime=")
1676  .append(to_string(lifetime.Sec()));
1677 
1678  if (pause_status == CQueue::ePauseWithPullback)
1679  reply.append("&pause=pullback");
1680  else if (pause_status == CQueue::ePauseWithoutPullback)
1681  reply.append("&pause=nopullback");
1682 
1684  reply.append("&msg=")
1685  .append(NStr::URLEncode(progress_msg));
1686 
1687  x_WriteMessage(reply);
1688  }
1689  else
1690  x_WriteMessage(kOKResponsePrefix + to_string((int) status) +
1691  kEndOfResponse);
1692  x_LogCommandWithJob(client_ip, client_sid, client_phid);
1693  }
1695 }
1696 
1697 
1699 {
1700  // This functionality requires client name and the session
1701  x_CheckNonAnonymousClient("use " + m_CommandArguments.cmd + " command");
1702 
1703  if (m_CommandArguments.aff_to_add.empty() &&
1704  m_CommandArguments.aff_to_del.empty()) {
1706  << " with neither add list nor del list");
1708  x_WriteMessage("ERR:eInvalidParameter:" + kEndOfResponse);
1710  return;
1711  }
1712 
1713  list<string> aff_to_add_list;
1714  list<string> aff_to_del_list;
1715 
1717  "\t,", aff_to_add_list);
1719  "\t,", aff_to_del_list);
1720 
1721  // CXX-8843: remove '-' affinity if so
1722  aff_to_add_list.remove(k_NoAffinityToken);
1723  aff_to_del_list.remove(k_NoAffinityToken);
1724 
1725  // Check that the same affinity has not been mentioned in both add and del
1726  // lists
1727  for (list<string>::const_iterator k = aff_to_add_list.begin();
1728  k != aff_to_add_list.end(); ++k) {
1729  if (find(aff_to_del_list.begin(), aff_to_del_list.end(), *k) !=
1730  aff_to_del_list.end()) {
1732  x_WriteMessage("ERR:eInvalidParameter:Affinity " + *k +
1733  " is in both add and del lists" +
1734  kEndOfResponse);
1736  return;
1737  }
1738  }
1739 
1740 
1741  // Here: prerequisites have been checked
1743  GetDiagContext().Extra()
1744  .Print("client_node", m_ClientId.GetNode())
1745  .Print("client_session", m_ClientId.GetSession());
1746 
1747  ECommandGroup cmd_group = eGet;
1748  if (m_CommandArguments.cmd == "CHRAFF")
1749  cmd_group = eRead;
1750 
1751  list<string> msgs = q->ChangeAffinity(m_ClientId, aff_to_add_list,
1752  aff_to_del_list,
1753  cmd_group);
1754  if (msgs.empty())
1756  else {
1757  string msg;
1758  for (list<string>::const_iterator k = msgs.begin();
1759  k != msgs.end(); ++k)
1760  msg += "WARNING:" + *k +";";
1762  }
1764 }
1765 
1766 
1768 {
1769  // This functionality requires client name and the session
1770  x_CheckNonAnonymousClient("use " + m_CommandArguments.cmd + " command");
1771 
1773  GetDiagContext().Extra()
1774  .Print("client_node", m_ClientId.GetNode())
1775  .Print("client_session", m_ClientId.GetSession());
1776 
1777  list<string> aff_to_set;
1779  "\t,", aff_to_set);
1780 
1781  // CXX-8843: remove '-' affinity if so
1782  aff_to_set.remove(k_NoAffinityToken);
1783 
1784  ECommandGroup cmd_group = eGet;
1785  if (m_CommandArguments.cmd == "SETRAFF")
1786  cmd_group = eRead;
1787 
1788  q->SetAffinity(m_ClientId, aff_to_set, cmd_group);
1789 
1792 }
1793 
1794 
1796 {
1797  if (q->GetRefuseSubmits() || m_Server->GetRefuseSubmits()) {
1799  x_WriteMessage("ERR:eSubmitsDisabled:" + kEndOfResponse);
1801  return;
1802  }
1803 
1806  // This is a drained shutdown mode
1809  x_WriteMessage("ERR:eSubmitsDisabled:" + kEndOfResponse);
1811  return;
1812  }
1813 
1814  // CXX-8843: if there is no affinity then replace it with the '-' affinity
1815  if (m_CommandArguments.affinity_token.empty())
1817  // CXX-8843: if there is no group then replace it with the '-' group
1818  if (m_CommandArguments.group.empty())
1820 
1821  CJob job(m_CommandArguments);
1822  try {
1824  unsigned int job_id = q->Submit(m_ClientId, job,
1827  x_NeedCmdLogging(),
1829 
1831  kEndOfResponse);
1834  } catch (...) {
1836  throw;
1837  }
1838 
1839  // There is no need to log the job key, it is logged at lower level
1840  // together with all the submitted job parameters
1842 }
1843 
1844 
1846 {
1847  if (q->GetRefuseSubmits() || m_Server->GetRefuseSubmits()) {
1849  x_WriteMessage("ERR:eSubmitsDisabled:" + kEndOfResponse);
1851  return;
1852  }
1853 
1856  // This is a drained shutdown mode
1859  x_WriteMessage("ERR:eSubmitsDisabled:" + kEndOfResponse);
1861  return;
1862  }
1863 
1864  try {
1865  // Memorize the fact that batch submit started
1866  m_WithinBatchSubmit = true;
1867 
1868  // CXX-8843: if no group is provided it is overwritten with '-' group
1869  // automatically
1870  if (m_CommandArguments.group.empty())
1872 
1879 
1880  x_WriteMessage("OK:Batch submit ready" + kEndOfResponse);
1882  }
1883  catch (...) {
1884  // WriteMessage can generate an exception
1885  m_WithinBatchSubmit = false;
1888  throw;
1889  }
1890 }
1891 
1892 
1894 {
1895  m_BatchPos = 0;
1897  m_BatchJobs.resize(m_BatchSize);
1898  if (m_BatchSize)
1900  else
1901  // Unfortunately, because batches can be generated by
1902  // client program, we better honor zero size ones.
1903  // Skip right to waiting for 'ENDB'.
1905 }
1906 
1907 
1909 {
1910  m_WithinBatchSubmit = false;
1912  m_BatchJobs.clear();
1916 }
1917 
1918 
1920 {
1921  // Job key or a group or an affinity or a status must be given
1922  if (m_CommandArguments.job_id == 0 &&
1923  m_CommandArguments.group.empty() &&
1926  if (x_NeedCmdLogging())
1927  ERR_POST(Warning <<
1928  "Neither job key nor a group nor an "
1929  "affinity nor a status list is provided "
1930  "for the CANCEL command");
1932  x_WriteMessage("ERR:eInvalidParameter:"
1933  "Job key or any combination of a group and an affinity "
1934  "and job statuses must be given" + kEndOfResponse);
1936  return;
1937  }
1938 
1939  if (m_CommandArguments.job_id != 0 &&
1940  (!m_CommandArguments.group.empty() ||
1943  if (x_NeedCmdLogging())
1944  ERR_POST(Warning <<
1945  "CANCEL can accept either a job "
1946  "key or any combination of a group "
1947  "and an affinity and job statuses");
1949  x_WriteMessage("ERR:eInvalidParameter:CANCEL can accept either a job "
1950  "key or any combination of a group and an affinity and "
1951  "job statuses" + kEndOfResponse);
1953  return;
1954  }
1955 
1956 
1957  // Here: arguments are checked. It is a certain job or any combination of
1958  // a group and an affinity and job statuses
1959  if (!m_CommandArguments.group.empty() ||
1962  // CANCEL for a group and/or affinity and or job statuses
1963 
1964  vector<string> warnings;
1965  vector<TJobStatus> statuses;
1966  if (!m_CommandArguments.job_statuses_string.empty()) {
1967 
1968  bool reported = false;
1969  vector<TJobStatus>::iterator k =
1971  while (k != m_CommandArguments.job_statuses.end() ) {
1972  if (*k == CNetScheduleAPI::eJobNotFound) {
1973  if (!reported) {
1974  warnings.push_back("eInvalidJobStatus:unknown job "
1975  "status in the status list");
1976  if (x_NeedCmdLogging())
1977  ERR_POST(Warning <<
1978  "Unknown job status in the status list. "
1979  "Ignore and continue.");
1980  reported = true;
1981  }
1982  k = m_CommandArguments.job_statuses.erase(k);
1983  } else
1984  ++k;
1985  }
1986 
1987  statuses = x_RemoveDuplicateStatuses(
1988  m_CommandArguments.job_statuses, warnings);
1989 
1990  // Here: no duplicates, no unresolved states. Check if there is
1991  // the 'Canceled' state
1992  k = statuses.begin();
1993  while (k != statuses.end()) {
1994  if (*k == CNetScheduleAPI::eCanceled) {
1995  warnings.push_back("eIgnoringCanceledStatus:attempt to "
1996  "cancel jobs in the 'Canceled' status");
1997  if (x_NeedCmdLogging())
1998  ERR_POST(Warning <<
1999  "Attempt to cancel jobs in the 'Canceled' "
2000  "status. Ignore and continue.");
2001  statuses.erase(k);
2002  break;
2003  }
2004  ++k;
2005  }
2006  }
2007 
2008  unsigned int count = q->CancelSelectedJobs(
2009  m_ClientId,
2012  statuses,
2013  x_NeedCmdLogging(), warnings);
2014  if (warnings.empty())
2015  x_WriteMessage("OK:" + to_string(count) + kEndOfResponse);
2016  else {
2017  string msg;
2018  for (vector<string>::const_iterator k = warnings.begin();
2019  k != warnings.end(); ++k) {
2020  msg += "WARNING:" + *k + ";";
2021  }
2022  x_WriteMessage("OK:" + msg + to_string(count) + kEndOfResponse);
2023  }
2024 
2026  return;
2027  }
2028 
2029  // Here: CANCEL for a job
2030  CJob job;
2031  switch (q->Cancel(m_ClientId,
2033  m_CommandArguments.job_key, job)) {
2035  if (x_NeedCmdLogging())
2036  ERR_POST(Warning <<
2037  "CANCEL for unknown job: " <<
2040  x_WriteMessage("OK:WARNING:eJobNotFound:Job not found;0" +
2041  kEndOfResponse);
2042  break;
2044  x_WriteMessage("OK:WARNING:eJobAlreadyCanceled:"
2045  "Already canceled;0" + kEndOfResponse);
2046  x_LogCommandWithJob(job);
2047  break;
2048  default:
2049  x_WriteMessage("OK:1" + kEndOfResponse);
2050  x_LogCommandWithJob(job);
2051  }
2053 }
2054 
2055 
2057 {
2058  CJob job;
2059  bool cmdv2 = (m_CommandArguments.cmd == "STATUS2");
2060  CNSPreciseTime lifetime;
2061 
2063  job, &lifetime) == CNetScheduleAPI::eJobNotFound) {
2064  // Here: there is no such a job
2066  << " for unknown job: "
2069  if (cmdv2)
2071  else
2073  to_string((int)CNetScheduleAPI::eJobNotFound) +
2074  kEndOfResponse);
2076  return;
2077  }
2078 
2079  // Here: the job was found
2080  string reply;
2081  reply.reserve(2048);
2082 
2083  if (cmdv2) {
2084  string pause_status_msg;
2085  CQueue::TPauseStatus pause_status = q->GetPauseStatus();
2086 
2087  if (pause_status == CQueue::ePauseWithPullback)
2088  pause_status_msg = "&pause=pullback";
2089  else if (pause_status == CQueue::ePauseWithoutPullback)
2090  pause_status_msg = "&pause=nopullback";
2091 
2092  string progress_msg_part;
2094  progress_msg_part.append("&msg=")
2095  .append(NStr::URLEncode(job.GetProgressMsg()));
2096 
2097  reply.append("OK:")
2098  .append("job_status=")
2100  .append("&client_ip=")
2101  .append(NStr::URLEncode(job.GetClientIP()))
2102  .append("&client_sid=")
2103  .append(NStr::URLEncode(job.GetClientSID()))
2104  .append("&ncbi_phid=")
2105  .append(NStr::URLEncode(job.GetNCBIPHID()))
2106  .append("&job_exptime=")
2107  .append(to_string(lifetime.Sec()))
2108  .append("&ret_code=")
2109  .append(to_string(job.GetRetCode()))
2110  .append("&output=")
2111  .append(NStr::URLEncode(job.GetOutput()))
2112  .append("&err_msg=")
2113  .append(NStr::URLEncode(job.GetErrorMsg()))
2114  .append("&input=")
2115  .append(NStr::URLEncode(job.GetInput()))
2116  .append(pause_status_msg)
2117  .append(progress_msg_part)
2118  .append(kEndOfResponse);
2119  } else {
2120  reply.append("OK:")
2121  .append(to_string((int) job.GetStatus()))
2122  .append(1, ' ')
2123  .append(to_string(job.GetRetCode()))
2124  .append(" \"")
2125  .append(NStr::PrintableString(job.GetOutput()))
2126  .append("\" \"")
2127  .append(NStr::PrintableString(job.GetErrorMsg()))
2128  .append("\" \"")
2129  .append(NStr::PrintableString(job.GetInput()))
2130  .append("\"")
2131  .append(kEndOfResponse);
2132  }
2133 
2134  x_WriteMessage(reply);
2135  x_LogCommandWithJob(job);
2137 }
2138 
2139 
2141 {
2142  // GET & WGET are first versions of the command
2143  bool cmdv2(m_CommandArguments.cmd == "GET2");
2144 
2145  if (cmdv2) {
2146  x_CheckNonAnonymousClient("use GET2 command");
2149  }
2150  else {
2151  // The affinity options are only for the second version of the command
2154 
2155  // The old clients must have any_affinity set to true
2156  // depending on the explicit affinity - to conform the old behavior
2159  }
2160 
2161  // Check if the queue is paused
2162  CQueue::TPauseStatus pause_status = q->GetPauseStatus();
2163  if (pause_status != CQueue::eNoPause) {
2164 
2165  if (m_CommandArguments.timeout != 0)
2168  cmdv2);
2169 
2170  string pause_status_str;
2171 
2172  if (pause_status == CQueue::ePauseWithPullback)
2173  pause_status_str = "pullback";
2174  else
2175  pause_status_str = "nopullback";
2176 
2177  if (cmdv2)
2178  x_WriteMessage("OK:pause=" + pause_status_str + kEndOfResponse);
2179  else
2181 
2182  if (x_NeedCmdLogging())
2183  GetDiagContext().Extra().Print("job_key", "None")
2184  .Print("reason",
2185  "pause: " + pause_status_str);
2186 
2188  return;
2189  }
2190 
2191 
2192  list<string> aff_list;
2194  "\t,", aff_list);
2195  list<string> group_list;
2197  "\t,", group_list);
2198 
2199  CJob job;
2200  string added_pref_aff;
2202  if (q->GetJobOrWait(m_ClientId,
2205  CNSPreciseTime::Current(), &aff_list,
2210  cmdv2,
2211  &group_list,
2212  &job,
2214  added_pref_aff) == false) {
2215  // Preferred affinities were reset for the client, so no job
2216  // and bad request
2218  x_WriteMessage("ERR:ePrefAffExpired:" + kEndOfResponse);
2219  } else {
2220  if (job.GetId())
2221  x_LogCommandWithJob(job);
2222 
2223  if (!added_pref_aff.empty()) {
2224  if (x_NeedCmdLogging()) {
2226  GetDiagContext().Extra()
2227  .Print("added_preferred_affinity", added_pref_aff);
2228  else
2229  GetDiagContext().Extra()
2230  .Print("client_node", m_ClientId.GetNode())
2231  .Print("client_session", m_ClientId.GetSession())
2232  .Print("added_preferred_affinity", added_pref_aff);
2233  }
2234  }
2235  x_PrintGetJobResponse(q, job, cmdv2);
2237  }
2238 
2240 }
2241 
2242 
2244 {
2245  x_CheckNonAnonymousClient("cancel waiting after WGET");
2246 
2250 }
2251 
2252 
2254 {
2255  x_CheckNonAnonymousClient("cancel waiting after READ");
2256 
2260 }
2261 
2262 
2264 {
2265  bool cmdv2(m_CommandArguments.cmd == "PUT2");
2266 
2267  if (cmdv2) {
2268  x_CheckNonAnonymousClient("use PUT2 command");
2270  }
2271 
2272  CJob job;
2276  job,
2280  if (old_status == CNetScheduleAPI::ePending ||
2281  old_status == CNetScheduleAPI::eRunning) {
2283  x_LogCommandWithJob(job);
2285  return;
2286  }
2287  if (old_status == CNetScheduleAPI::eFailed) {
2288  // Still accept the job results, but print a warning: CXX-3632
2289  ERR_POST(Warning << "Accepting results for a job in the FAILED state.");
2291  x_LogCommandWithJob(job);
2293  return;
2294  }
2295  if (old_status == CNetScheduleAPI::eDone) {
2296  ERR_POST(Warning << "Cannot accept job "
2298  << " results. The job has already been done.");
2299  x_WriteMessage("OK:WARNING:eJobAlreadyDone:Already done;" +
2300  kEndOfResponse);
2301  x_LogCommandWithJob(job);
2303  return;
2304  }
2305  if (old_status == CNetScheduleAPI::eJobNotFound) {
2306  ERR_POST(Warning << "Cannot accept job "
2308  << " results. The job is unknown");
2312  return;
2313  }
2314 
2315  // Here: invalid job status, nothing will be done
2316  ERR_POST(Warning << "Cannot accept job "
2318  << " results; job is in "
2319  << CNetScheduleAPI::StatusToString(old_status)
2320  << " state");
2322  x_WriteMessage("ERR:eInvalidJobStatus:"
2323  "Cannot accept job results; job is in " +
2324  CNetScheduleAPI::StatusToString(old_status) + " state" +
2325  kEndOfResponse);
2326  x_LogCommandWithJob(job);
2328 }
2329 
2330 
2332 {
2333  // The JXCG command is used only by old clients. All new client should use
2334  // PUT2 + GET2 sequence.
2335  // The old clients must have any_affinity set to true
2336  // depending on the explicit affinity - to conform the old behavior
2338 
2339 
2341 
2342  // PUT part
2343  CJob job;
2344  TJobStatus old_status = q->PutResult(m_ClientId, curr,
2347  job,
2351 
2352  if (old_status == CNetScheduleAPI::eJobNotFound) {
2353  ERR_POST(Warning << "Cannot accept job "
2355  << " results. The job is unknown");
2356  } else if (old_status == CNetScheduleAPI::eFailed) {
2357  // Still accept the job results, but print a warning: CXX-3632
2358  ERR_POST(Warning << "Accepting results for a job in the FAILED state.");
2359  x_LogCommandWithJob(job);
2360  } else if (old_status != CNetScheduleAPI::ePending &&
2361  old_status != CNetScheduleAPI::eRunning) {
2362  x_LogCommandWithJob(job);
2363  ERR_POST(Warning << "Cannot accept job "
2365  << " results. The job has already been done.");
2366  } else {
2367  x_LogCommandWithJob(job);
2368  }
2369 
2370 
2371  // Get part
2372  CQueue::TPauseStatus pause_status = q->GetPauseStatus();
2373  if (pause_status != CQueue::eNoPause) {
2374 
2375  if (m_CommandArguments.timeout != 0)
2378  false);
2379 
2381  if (x_NeedCmdLogging()) {
2382  string pause_status_str;
2383 
2384  if (pause_status == CQueue::ePauseWithPullback)
2385  pause_status_str = "pullback";
2386  else
2387  pause_status_str = "nopullback";
2388 
2389  GetDiagContext().Extra().Print("job_key", "None")
2390  .Print("reason",
2391  "pause: " + pause_status_str);
2392  }
2394  return;
2395  }
2396 
2397  list<string> aff_list;
2399  "\t,", aff_list);
2400 
2401  string added_pref_aff;
2403  if (q->GetJobOrWait(m_ClientId,
2406  curr, &aff_list,
2409  false,
2410  false,
2411  false,
2412  NULL,
2413  &job,
2415  added_pref_aff) == false) {
2416  // Preferred affinities were reset for the client, or the client had
2417  // been garbage collected so no job and bad request
2419  x_WriteMessage("ERR:ePrefAffExpired:" + kEndOfResponse);
2420  } else {
2421  if (added_pref_aff.empty() == false) {
2422  if (x_NeedCmdLogging()) {
2424  GetDiagContext().Extra()
2425  .Print("added_preferred_affinity", added_pref_aff);
2426  else
2427  GetDiagContext().Extra()
2428  .Print("client_node", m_ClientId.GetNode())
2429  .Print("client_session", m_ClientId.GetSession())
2430  .Print("added_preferred_affinity", added_pref_aff);
2431  }
2432  }
2433  x_PrintGetJobResponse(q, job, false);
2435  }
2436 
2438 }
2439 
2440 
2442 {
2443  CJob job;
2444 
2448  x_LogCommandWithJob(job);
2449  } else {
2450  ERR_POST(Warning << "MPUT for unknown job "
2454  }
2456 }
2457 
2458 
2460 {
2461  CJob job;
2462  CNSPreciseTime lifetime;
2463 
2465  job, &lifetime) != CNetScheduleAPI::eJobNotFound) {
2467  kEndOfResponse);
2468  x_LogCommandWithJob(job);
2469  } else {
2471  << "MGET for unknown job "
2475  }
2477 }
2478 
2479 
2481 {
2482  bool cmdv2(m_CommandArguments.cmd == "FPUT2");
2483 
2484  if (cmdv2) {
2485  x_CheckNonAnonymousClient("use FPUT2 command");
2487  }
2488 
2489  CJob job;
2490  string warning;
2491  TJobStatus old_status = q->FailJob(
2492  m_ClientId,
2495  job,
2501  warning);
2502 
2503  if (old_status == CNetScheduleAPI::eJobNotFound) {
2504  ERR_POST(Warning << "FPUT for unknown job "
2509  return;
2510  }
2511 
2512  if (old_status == CNetScheduleAPI::eFailed) {
2513  ERR_POST(Warning << "FPUT for already failed job "
2515  x_WriteMessage("OK:WARNING:eJobAlreadyFailed:Already failed;" +
2516  kEndOfResponse);
2517  x_LogCommandWithJob(job);
2519  return;
2520  }
2521 
2522  if (old_status != CNetScheduleAPI::eRunning) {
2523  ERR_POST(Warning << "Cannot fail job "
2525  << "; job is in "
2526  << CNetScheduleAPI::StatusToString(old_status)
2527  << " state");
2529  x_WriteMessage("ERR:eInvalidJobStatus:Cannot fail job; job is in " +
2530  CNetScheduleAPI::StatusToString(old_status) + " state" +
2531  kEndOfResponse);
2532  x_LogCommandWithJob(job);
2534  return;
2535  }
2536 
2537  // Here: all is fine
2538  if (warning.empty())
2540  else
2541  x_WriteMessage("OK:WARNING:" + warning + ";" + kEndOfResponse);
2542  x_LogCommandWithJob(job);
2544 }
2545 
2546 
2548 {
2549  // The DROPQ implementation has been changed in NS 4.23.2
2550  // Earlier it was removing jobs from the queue
2551  // Starting from NS 4.23.2 it is cancelling all the jobs, i.e
2552  // DROPQ became an equivalent of the CANCELQ command except of the
2553  // output. DROPQ provides OK: while CANCELQ provides OK:N where N is
2554  // the number of the canceled jobs.
2558 }
2559 
2560 
2562 {
2563  bool cmdv2(m_CommandArguments.cmd == "RETURN2");
2565 
2566  if (cmdv2) {
2567  x_CheckNonAnonymousClient("use RETURN2 command");
2569 
2571  return_option = CQueue::eWithBlacklist;
2572  else
2573  return_option = CQueue::eWithoutBlacklist;
2574  }
2575 
2576  CJob job;
2577  string warning;
2578  TJobStatus old_status = q->ReturnJob(m_ClientId,
2581  job,
2583  warning, return_option);
2584 
2585  if (old_status == CNetScheduleAPI::eRunning) {
2586  if (warning.empty())
2588  else
2589  x_WriteMessage("OK:WARNING:" + warning + ";" + kEndOfResponse);
2591  x_LogCommandWithJob(job);
2592  return;
2593  }
2594 
2595  if (old_status == CNetScheduleAPI::eJobNotFound) {
2596  ERR_POST(Warning << "RETURN for unknown job "
2601  return;
2602  }
2603 
2604  ERR_POST(Warning << "Cannot return job "
2606  << "; job is in "
2607  << CNetScheduleAPI::StatusToString(old_status)
2608  << " state");
2610  x_WriteMessage("ERR:eInvalidJobStatus:Cannot return job; job is in " +
2611  CNetScheduleAPI::StatusToString(old_status) + " state" +
2612  kEndOfResponse);
2613 
2614  x_LogCommandWithJob(job);
2616 }
2617 
2618 
2620 {
2621  x_CheckNonAnonymousClient("use RESCHEDULE command");
2623 
2624  CJob job;
2625  bool auth_token_ok = true;
2626 
2627  // CXX-8843: replace no affinity with "-" affinity
2628  if (m_CommandArguments.affinity_token.empty())
2630 
2631  // CXX-8843: replace no group with "-" group
2632  if (m_CommandArguments.group.empty())
2634 
2635  TJobStatus old_status = q->RescheduleJob(
2636  m_ClientId,
2642  auth_token_ok,
2643  job);
2644 
2645  if (!auth_token_ok) {
2646  ERR_POST(Warning << "Invalid authorization token");
2648  x_WriteMessage("ERR:eInvalidAuthToken:" + kEndOfResponse);
2650  return;
2651  }
2652 
2653  if (old_status == CNetScheduleAPI::eRunning) {
2656  x_LogCommandWithJob(job);
2657  return;
2658  }
2659 
2660  if (old_status == CNetScheduleAPI::eJobNotFound) {
2661  ERR_POST(Warning << "RESCHEDULE for unknown job "
2666  return;
2667  }
2668 
2669  ERR_POST(Warning << "Cannot reschedule job "
2671  << "; job is in "
2672  << CNetScheduleAPI::StatusToString(old_status)
2673  << " state");
2675  x_WriteMessage("ERR:eInvalidJobStatus:Cannot reschedule job; job is in " +
2676  CNetScheduleAPI::StatusToString(old_status) + " state" +
2677  kEndOfResponse);
2678 
2679  x_LogCommandWithJob(job);
2681 }
2682 
2683 
2685 {
2686  x_CheckNonAnonymousClient("use REDO command");
2687 
2688  CJob job;
2689  TJobStatus old_status = q->RedoJob(m_ClientId,
2692  job);
2693 
2694  if (old_status == CNetScheduleAPI::eJobNotFound) {
2695  ERR_POST(Warning << "REDO for unknown job "
2700  return;
2701  }
2702 
2703  if (old_status == CNetScheduleAPI::ePending ||
2704  old_status == CNetScheduleAPI::eRunning ||
2705  old_status == CNetScheduleAPI::eReading) {
2706  ERR_POST(Warning << "Cannot redo job "
2708  << "; job is in "
2709  << CNetScheduleAPI::StatusToString(old_status)
2710  << " state");
2712  x_WriteMessage("ERR:eInvalidJobStatus:Cannot redo job; job is in " +
2713  CNetScheduleAPI::StatusToString(old_status) + " state" +
2714  kEndOfResponse);
2715  } else {
2717  }
2718 
2719  x_LogCommandWithJob(job);
2721 }
2722 
2723 
2725 {
2726  if (m_CommandArguments.timeout <= 0) {
2727  ERR_POST(Warning << "Invalid timeout "
2729  << " in JDEX for job "
2732  x_WriteMessage("ERR:eInvalidParameter:" + kEndOfResponse);
2734  return;
2735  }
2736 
2737  CJob job;
2740  job, timeout);
2741 
2742  if (status == CNetScheduleAPI::eJobNotFound) {
2743  ERR_POST(Warning << "JDEX for unknown job "
2748  return;
2749  }
2750  if (status != CNetScheduleAPI::eRunning) {
2751  ERR_POST(Warning << "Cannot change expiration for job "
2753  << " in status "
2754  << CNetScheduleAPI::StatusToString(status));
2756  x_WriteMessage("ERR:eInvalidJobStatus:" +
2758  kEndOfResponse);
2759  x_LogCommandWithJob(job);
2761  return;
2762  }
2763 
2764  // Here: the new timeout has been applied
2766  x_LogCommandWithJob(job);
2768 }
2769 
2770 
2772 {
2773  if (m_CommandArguments.timeout <= 0) {
2774  ERR_POST(Warning << "Invalid timeout "
2776  << " in JDREX for job "
2779  x_WriteMessage("ERR:eInvalidParameter:" + kEndOfResponse);
2781  return;
2782  }
2783 
2784  CJob job;
2786  TJobStatus status = q->JobDelayReadExpiration(
2788  job, timeout);
2789 
2790  if (status == CNetScheduleAPI::eJobNotFound) {
2791  ERR_POST(Warning << "JDREX for unknown job "
2796  return;
2797  }
2798  if (status != CNetScheduleAPI::eReading) {
2799  ERR_POST(Warning << "Cannot change read expiration for job "
2801  << " in status "
2802  << CNetScheduleAPI::StatusToString(status));
2804  x_WriteMessage("ERR:eInvalidJobStatus:" +
2806  kEndOfResponse);
2807  x_LogCommandWithJob(job);
2809  return;
2810  }
2811 
2812  // Here: the new timeout has been applied
2814  x_LogCommandWithJob(job);
2816 }
2817 
2818 
2820 {
2821  size_t last_event_index = 0;
2823  CJob job;
2824  TJobStatus status = q->SetJobListener(
2828  timeout,
2831  &last_event_index);
2832 
2833  if (status == CNetScheduleAPI::eJobNotFound) {
2834  ERR_POST(Warning << "LISTEN for unknown job "
2838  } else {
2839  string progress_msg_part;
2841  progress_msg_part = "&msg=" +
2843 
2844  x_WriteMessage("OK:job_status=" +
2846  "&last_event_index=" +
2847  to_string(last_event_index) +
2848  progress_msg_part +
2849  kEndOfResponse);
2850  x_LogCommandWithJob(job);
2851  }
2852 
2854 }
2855 
2856 
2858 {
2859  CSocket & socket = GetSocket();
2860  const string & what = m_CommandArguments.option;
2862 
2863  if (!what.empty() && what != "QCLASSES" && what != "QUEUES" &&
2864  what != "JOBS" && what != "ALL" && what != "CLIENTS" &&
2865  what != "NOTIFICATIONS" && what != "AFFINITIES" &&
2866  what != "GROUPS" && what != "WNODE" && what != "SERVICES" &&
2867  what != "ALERTS" && what != "SCOPES") {
2868  NCBI_THROW(CNetScheduleException, eInvalidParameter,
2869  "Unsupported '" + what +
2870  "' parameter for the STAT command.");
2871  }
2872 
2873  if (q == NULL && (what == "CLIENTS" || what == "NOTIFICATIONS" ||
2874  what == "AFFINITIES" || what == "GROUPS" ||
2875  what == "WNODE" || what == "SCOPES")) {
2876  NCBI_THROW(CNetScheduleException, eInvalidParameter,
2877  "STAT " + what + " requires a queue");
2878  }
2879 
2880  if (q != NULL)
2882 
2883  if (what == "QCLASSES") {
2884  string info = m_Server->GetQueueClassesInfo();
2885 
2886  if (!info.empty())
2887  info += kEndOfResponse;
2888 
2889  x_WriteMessage(info + "OK:END" + kEndOfResponse);
2891  return;
2892  }
2893  if (what == "QUEUES") {
2894  string info = m_Server->GetQueueInfo();
2895 
2896  if (!info.empty())
2897  info += kEndOfResponse;
2898 
2899  x_WriteMessage(info + "OK:END" + kEndOfResponse);
2901  return;
2902  }
2903  if (what == "SERVICES") {
2904  string output;
2905  map<string, string> services;
2906  m_Server->GetServices(services);
2907  for (map<string, string>::const_iterator k = services.begin();
2908  k != services.end(); ++k) {
2909  if (!output.empty())
2910  output += "&";
2911  output += k->first + "=" + k->second;
2912  }
2915  return;
2916  }
2917  if (what == "ALERTS") {
2918  string output = m_Server->SerializeAlerts();
2919  if (!output.empty())
2921  x_WriteMessage(output + "OK:END" + kEndOfResponse);
2923  return;
2924  }
2925 
2926  if (q == NULL) {
2927  string info;
2928  if (what == "JOBS")
2930  else {
2931  // Transition counters for all the queues
2932  info = "OK:Started: " + m_Server->GetStartTime().AsString() +
2934 
2935  info += "OK:SubmitsDisabledEffective: ";
2936  if (m_Server->GetRefuseSubmits()) info += "1";
2937  else info += "0";
2938  info += kEndOfResponse;
2939 
2940  info += "OK:DrainedShutdown: ";
2941  if (m_Server->IsDrainShutdown()) info += "1";
2942  else info += "0";
2943  info += kEndOfResponse;
2944 
2946  }
2947 
2949  x_WriteMessage(info + "OK:END" + kEndOfResponse);
2951  return;
2952  }
2953 
2954 
2955  socket.DisableOSSendDelay(false);
2956  if (!what.empty() && what != "ALL") {
2957  x_StatisticsNew(q, what, curr);
2958  return;
2959  }
2960 
2961 
2962  string info = "OK:Started: " +
2964 
2965  info += "OK:SubmitsDisabledEffective: ";
2966  if (m_Server->GetRefuseSubmits() || q->GetRefuseSubmits()) info += "1";
2967  else info += "0";
2968  info += kEndOfResponse;
2969 
2970  info += "OK:SubmitsDisabledPrivate: ";
2971  if (q->GetRefuseSubmits()) info += "1";
2972  else info += "0";
2973  info += kEndOfResponse;
2974 
2975  for (size_t k = 0; k < g_ValidJobStatusesSize; ++k) {
2977  unsigned count = q->CountStatus(st);
2978 
2979  info += "OK:" + CNetScheduleAPI::StatusToString(st) + ": " +
2980  to_string(count) + kEndOfResponse;
2981 
2982  if (what == "ALL") {
2983  TNSBitVector::statistics bv_stat;
2984  q->StatusStatistics(st, &bv_stat);
2985  info += "OK:"
2986  " bit_blk=" + to_string(bv_stat.bit_blocks) +
2987  "; gap_blk=" + to_string(bv_stat.gap_blocks) +
2988  "; mem_used=" + to_string(bv_stat.memory_used) +
2990  }
2991  } // for
2992 
2995  "OK:[Transitions counters]:" + kEndOfResponse +
2997  "OK:END" + kEndOfResponse);
2999 }
3000 
3001 
3003 {
3004  // Check permissions explicitly due to additional case (the encrypted admin
3005  // names could be not available)
3008  m_Server->RegisterAlert(eAccess, "admin privileges required "
3009  "to execute RECO");
3010  NCBI_THROW(CNetScheduleException, eAccessDenied,
3011  "Access denied: admin privileges required");
3012  }
3013 
3014  // Unconditionally reload the decryptor in case if the key files are
3015  // changed
3017 
3019  bool reloaded = app->ReloadConfig(
3021 
3022  if (reloaded || m_Server->AnybodyCanReconfigure()) {
3023  const CNcbiRegistry & reg = app->GetConfig();
3024  vector<string> config_warnings;
3025  bool admin_decrypt_error(false); // ignored here
3026  NS_ValidateConfigFile(reg, config_warnings, false, admin_decrypt_error);
3027 
3028  if (!config_warnings.empty()) {
3029  string msg;
3030  string alert_msg;
3031  for (vector<string>::const_iterator k = config_warnings.begin();
3032  k != config_warnings.end(); ++k) {
3033  ERR_POST(*k);
3034  if (!msg.empty()) {
3035  msg += "; ";
3036  alert_msg += "\n";
3037  }
3038  msg += *k;
3039  alert_msg += *k;
3040  }
3041  m_Server->RegisterAlert(eReconfigure, alert_msg);
3043 
3044  msg = "ERR:eInvalidParameter:Configuration file is not "
3045  "well formed. " + msg;
3046  if (msg.size() > 1024) {
3047  msg.resize(1024);
3048  msg += " TRUNCATED";
3049  }
3050 
3053  return;
3054  }
3055 
3056  // Update the config file checksum in memory
3057  vector<string> config_checksum_warnings;
3058  string config_checksum = NS_GetConfigFileChecksum(
3059  app->GetConfigPath(), config_checksum_warnings);
3060  if (config_checksum_warnings.empty()) {
3061  m_Server->SetRAMConfigFileChecksum(config_checksum);
3062  m_Server->SetDiskConfigFileChecksum(config_checksum);
3063  } else {
3064  for (vector<string>::const_iterator
3065  k = config_checksum_warnings.begin();
3066  k != config_checksum_warnings.end(); ++k)
3067  ERR_POST(*k);
3068  }
3069 
3070  // Logging from the [server] section
3071  SNS_Parameters params;
3072  params.Read(reg);
3073 
3074  CJsonNode what_changed = m_Server->SetNSParameters(params, true);
3075  CJsonNode services_changed = m_Server->ReadServicesConfig(reg);
3076 
3078  m_Server->Configure(reg, diff);
3080 
3081  if (what_changed.GetSize() == 0 &&
3082  diff.GetSize() == 0 &&
3083  services_changed.GetSize() == 0) {
3084  m_Server->AcknowledgeAlert(eReconfigure, "NSAcknowledge");
3085  m_Server->AcknowledgeAlert(eConfigOutOfSync, "NSAcknowledge");
3086  if (x_NeedCmdLogging())
3087  GetDiagContext().Extra().Print("accepted_changes", "none");
3088  x_WriteMessage("OK:WARNING:eNoParametersChanged:No changes in "
3089  "changeable parameters were identified in the new "
3090  "cofiguration file;" + kEndOfResponse);
3092  return;
3093  }
3094 
3095  // Merge the changes
3096  for (CJsonIterator k = what_changed.Iterate(); k; ++k)
3097  diff.SetByKey(k.GetKey(), k.GetNode());
3098  for (CJsonIterator k = services_changed.Iterate(); k; ++k)
3099  diff.SetByKey(k.GetKey(), k.GetNode());
3100 
3101  string diff_as_string = diff.Repr();
3102  if (x_NeedCmdLogging())
3103  GetDiagContext().Extra().Print("config_changes", diff_as_string);
3104 
3105  m_Server->AcknowledgeAlert(eReconfigure, "NSAcknowledge");
3106  m_Server->AcknowledgeAlert(eConfigOutOfSync, "NSAcknowledge");
3107  x_WriteMessage("OK:" + diff_as_string + kEndOfResponse);
3108  }
3109  else
3110  x_WriteMessage("OK:WARNING:eConfigFileNotChanged:Configuration "
3111  "file has not been changed, RECO ignored;" +
3112  kEndOfResponse);
3113 
3115 }
3116 
3117 
3119 {
3120  string active_jobs = to_string(m_Server->CountActiveJobs());
3121 
3122  x_WriteMessage("OK:" + active_jobs + kEndOfResponse);
3124 }
3125 
3126 
3128 {
3129  if (m_CommandArguments.job_id != 0 &&
3130  (!m_CommandArguments.group.empty() ||
3133  if (x_NeedCmdLogging())
3134  ERR_POST(Warning << "DUMP can accept either a job key or no "
3135  "parameters or any combination of a group and "
3136  "an affinity and job statuses");
3138  x_WriteMessage("ERR:eInvalidParameter:DUMP can accept either a job "
3139  "key or no parameters or any combination of a group and "
3140  "an affinity and job statuses" + kEndOfResponse);
3142  return;
3143  }
3144 
3145  if (m_CommandArguments.job_id == 0) {
3146  // The whole queue dump, may be restricted by a list of statuses
3147  if (!m_CommandArguments.job_statuses_string.empty()) {
3148  bool reported = false;
3149  vector<TJobStatus>::iterator k =
3151  while (k != m_CommandArguments.job_statuses.end() ) {
3152  if (*k == CNetScheduleAPI::eJobNotFound) {
3153  if (!reported) {
3154  if (x_NeedCmdLogging())
3155  ERR_POST(Warning <<
3156  "Unknown job status in the status list. "
3157  "Ignore and continue.");
3158  reported = true;
3159  }
3160  k = m_CommandArguments.job_statuses.erase(k);
3161  } else
3162  ++k;
3163  }
3164  }
3165 
3166  vector<string> warnings; // Not used here: multiline output
3167  // commands do not have a place for
3168  // warnings
3169  vector<TJobStatus> statuses = x_RemoveDuplicateStatuses(
3170  m_CommandArguments.job_statuses, warnings);
3171 
3172  // Check for a special case: statuses were given however all of them
3173  // are unknown
3174  if (!m_CommandArguments.job_statuses_string.empty() &&
3175  statuses.size() == 0)
3176  x_WriteMessage("OK:END" + kEndOfResponse);
3177  else
3182  statuses,
3187  x_NeedCmdLogging()) +
3188  "OK:END" + kEndOfResponse);
3190  return;
3191  }
3192 
3193 
3194  // Certain job dump
3195  string job_info = q->PrintJobDbStat(m_ClientId,
3198  if (job_info.empty()) {
3199  // Nothing was printed because there is no such a job
3202  } else
3203  x_WriteMessage(job_info + "OK:END" + kEndOfResponse);
3204 
3206 }
3207 
3208 
3210 {
3211  if (m_CommandArguments.drain) {
3212  if (m_Server->ShutdownRequested()) {
3214  x_WriteMessage("ERR:eShuttingDown:The server is in "
3215  "shutting down state" + kEndOfResponse);
3217  return;
3218  }
3219  if (m_Server->IsDrainShutdown()) {
3220  x_WriteMessage("OK:WARNING:eAlreadyDrainShutdown:The server is "
3221  "already in drain shutdown state;" +
3222  kEndOfResponse);
3224  } else {
3227  m_Server->SetRefuseSubmits(true);
3229  }
3230  return;
3231  }
3232 
3233  // Unconditional immediate shutdown.
3236  m_Server->SetRefuseSubmits(true);
3238 }
3239 
3240 
3242 {
3243  string configuration;
3244 
3246  // The effective config (some parameters could be altered
3247  // at run-time) has been requested
3248  configuration = x_GetServerSection() +
3249  x_GetLogSection() +
3250  x_GetDiagSection() +
3256  } else {
3257  // The original config file (the one used at the startup)
3258  // has been requested
3259  CNcbiOstrstream conf;
3260  CNcbiOstrstreamToString converter(conf);
3261 
3263  configuration = string(converter);
3264  }
3265  x_WriteMessage(configuration + "OK:END" + kEndOfResponse);
3267 }
3268 
3269 
3271 {
3272  // Further NS versions should exclude ns_node, ns_session and pid from the
3273  // VERSION command in favor of the HEALTH command
3274  static string reply =
3275  "OK:server_version=" NETSCHEDULED_VERSION
3276  "&storage_version=" NETSCHEDULED_STORAGE_VERSION
3277  "&protocol_version=" NETSCHEDULED_PROTOCOL_VERSION
3278  "&build_date=" + NStr::URLEncode(NETSCHEDULED_BUILD_DATE) +
3279  "&ns_node=" + m_Server->GetNodeID() +
3280  "&ns_session=" + m_Server->GetSessionID() +
3281  "&pid=" + to_string(CDiagContext::GetPID()) +
3283  x_WriteMessage(reply);
3285 }
3286 
3287 
3289 {
3290  double real_time;
3291  double user_time;
3292  double system_time;
3293  bool process_time_result = CCurrentProcess::GetTimes(&real_time,
3294  &user_time,
3295  &system_time);
3296  Uint8 physical_memory = CSystemInfo::GetTotalPhysicalMemorySize();
3297 
3298  CProcessBase::SMemoryUsage mem_used;
3299  bool mem_used_result =
3301 
3302  int proc_fd_soft_limit;
3303  int proc_fd_hard_limit;
3304  int proc_fd_used = CCurrentProcess::GetFileDescriptorsCount(
3305  &proc_fd_soft_limit,
3306  &proc_fd_hard_limit);
3307  int proc_thread_count = CCurrentProcess::GetThreadCount();
3308 
3309  #if defined(_DEBUG) && !defined(NDEBUG)
3310  if (m_CmdContext.NotNull()) {
3312 
3313  if (err_emul.IsActive())
3314  if (err_emul.as_int >= 0)
3315  proc_fd_used = err_emul.as_int;
3316 
3317  err_emul = m_Server->GetDebugMemCount();
3318  if (err_emul.IsActive())
3319  if (err_emul.as_int >= 0)
3320  mem_used.total = err_emul.as_int;
3321  }
3322  #endif
3323 
3324  string reply =
3325  "OK:pid=" +
3326  to_string(CDiagContext::GetPID()) +
3327  "&ns_node=" +
3328  m_Server->GetNodeID() +
3329  "&ns_session=" +
3330  m_Server->GetSessionID() +
3331  "&started=" +
3333  "&cpu_count=" +
3334  to_string(CSystemInfo::GetCpuCount());
3335  if (process_time_result)
3336  reply += "&user_time=" + to_string(user_time) +
3337  "&system_time=" + to_string(system_time) +
3338  "&real_time=" + to_string(real_time);
3339  else
3340  reply += "&user_time=n/a&system_time=n/a&real_time=n/a";
3341 
3342  if (physical_memory > 0)
3343  reply += "&physical_memory=" + to_string(physical_memory);
3344  else
3345  reply += "&physical_memory=n/a";
3346 
3347  if (mem_used_result)
3348  reply += "&mem_used_total=" + to_string(mem_used.total) +
3349  "&mem_used_total_peak=" + to_string(mem_used.total_peak) +
3350  "&mem_used_resident=" + to_string(mem_used.resident) +
3351  "&mem_used_resident_peak=" + to_string(mem_used.resident_peak) +
3352  "&mem_used_shared=" + to_string(mem_used.shared) +
3353  "&mem_used_data=" + to_string(mem_used.data) +
3354  "&mem_used_stack=" + to_string(mem_used.stack) +
3355  "&mem_used_text=" + to_string(mem_used.text) +
3356  "&mem_used_lib=" + to_string(mem_used.lib) +
3357  "&mem_used_swap=" + to_string(mem_used.swap);
3358  else
3359  reply += "&mem_used_total=n/a"
3360  "&mem_used_total_peak=n/a"
3361  "&mem_used_resident=n/a"
3362  "&mem_used_resident_peak=n/a"
3363  "&mem_used_shared=n/a"
3364  "&mem_used_data=n/a"
3365  "&mem_used_stack=n/a"
3366  "&mem_used_text=n/a"
3367  "&mem_used_lib=n/a"
3368  "&mem_used_swap=n/a";
3369 
3370  if (proc_fd_soft_limit >= 0)
3371  reply += "&proc_fd_soft_limit=" + to_string(proc_fd_soft_limit);
3372  else
3373  reply += "&proc_fd_soft_limit=n/a";
3374 
3375  if (proc_fd_hard_limit >= 0)
3376  reply += "&proc_fd_hard_limit=" + to_string(proc_fd_hard_limit);
3377  else
3378  reply += "&proc_fd_hard_limit=n/a";
3379 
3380  if (proc_fd_used >= 0)
3381  reply += "&proc_fd_used=" + to_string(proc_fd_used);
3382  else
3383  reply += "&proc_fd_used=n/a";
3384 
3385  if (proc_thread_count >= 1)
3386  reply += "&proc_thread_count=" + to_string(proc_thread_count);
3387  else
3388  reply += "&proc_thread_count=n/a";
3389 
3390  string alerts = m_Server->GetAlerts();
3391  if (!alerts.empty())
3392  reply += "&" + alerts;
3393 
3394  // Add an executable path, config file path and command line arguments:
3395  // see CXX-9139
3397  reply += "&exe_path=" + NStr::URLEncode(app->GetProgramExecutablePath());
3398  reply += "&config_path=" + NStr::URLEncode(app->GetConfigPath());
3399 
3400  const CNcbiArguments& arguments = app->GetArguments();
3401  size_t args_size = arguments.Size();
3402  string cmdline_args;
3403  for (size_t index = 0; index < args_size; ++index) {
3404  if (index != 0)
3405  cmdline_args += " ";
3406  cmdline_args += arguments[index];
3407  }
3408  reply += "&cmdline_args=" + NStr::URLEncode(cmdline_args);
3409 
3410  x_WriteMessage(reply + kEndOfResponse);
3412 }
3413 
3414 
3416 {
3417  enum EAlertAckResult result =
3420  switch (result) {
3421  case eNotFound:
3422  x_WriteMessage("OK:WARNING:eAlertNotFound:Alert has not been "
3423  "found;" + kEndOfResponse);
3424  break;
3425  case eAlreadyAcknowledged:
3426  x_WriteMessage("OK:WARNING:eAlertAlreadyAcknowledged:Alert has "
3427  "already been acknowledged;" + kEndOfResponse);
3428  break;
3429  default:
3431  }
3433 }
3434 
3435 
3437 {
3440 }
3441 
3442 
3444 {
3446 }
3447 
3448 
3450 {
3451  // program and submitter restrictions must be checked for the queue class
3453  m_ClientId,
3459 }
3460 
3461 
3463 {
3464  // program and submitter restrictions must be checked for the queue to
3465  // be deleted
3469 }
3470 
3471 
3473 {
3474  bool cmdv2(m_CommandArguments.cmd == "QINF2");
3475 
3476  if (cmdv2) {
3478 
3479  string qname = m_CommandArguments.qname;
3480  if (!m_CommandArguments.service.empty()) {
3481  // Service has been provided, need to resolve it to a queue
3483  if (qname.empty()) {
3485  x_WriteMessage("ERR:eUnknownService:Cannot resolve service " +
3486  m_CommandArguments.service + " to a queue" +
3487  kEndOfResponse);
3489  return;
3490  }
3491  }
3492 
3493  SQueueParameters params = m_Server->QueueInfo(qname);
3494  CRef<CQueue> queue_ref;
3495  CQueue * queue_ptr;
3496  size_t jobs_per_state[g_ValidJobStatusesSize];
3497  string jobs_part;
3498  string linked_sections_part;
3499  size_t total = 0;
3500  map< string,
3501  map<string, string> > linked_sections;
3502  vector<string> warnings;
3503 
3504  queue_ref.Reset(m_Server->OpenQueue(qname));
3505  queue_ptr = queue_ref.GetPointer();
3506  queue_ptr->GetJobsPerState(m_ClientId, "", "",
3507  jobs_per_state, warnings);
3508  queue_ptr->GetLinkedSections(linked_sections);
3509 
3510  for (size_t index(0); index < g_ValidJobStatusesSize; ++index) {
3511  jobs_part.append(1, '&')
3513  .append(1, '=')
3514  .append(to_string(jobs_per_state[index]));
3515  total += jobs_per_state[index];
3516  }
3517  jobs_part.append("&Total=")
3518  .append(to_string(total));
3519 
3520  for (map< string, map<string, string> >::const_iterator
3521  k = linked_sections.begin(); k != linked_sections.end(); ++k) {
3522  string prefix((k->first).c_str() + strlen("linked_section_"));
3523  for (map<string, string>::const_iterator j = k->second.begin();
3524  j != k->second.end(); ++j) {
3525  linked_sections_part.append(1, '&')
3526  .append(prefix)
3527  .append(1, '.')
3528  .append(NStr::URLEncode(j->first))
3529  .append(1, '=')
3530  .append(NStr::URLEncode(j->second));
3531  }
3532  }
3533  string qname_part;
3534  if (!m_CommandArguments.service.empty())
3535  qname_part.append("queue_name=")
3536  .append(qname)
3537  .append(1, '&');
3538 
3539  // Include queue classes and use URL encoding
3540  x_WriteMessage("OK:" + qname_part +
3541  params.GetPrintableParameters(true, true) +
3542  jobs_part + linked_sections_part + kEndOfResponse);
3543  } else {
3546  x_WriteMessage("OK:" + to_string(params.kind) + "\t" +
3547  params.qclass + "\t\"" +
3548  NStr::PrintableString(params.description) + "\"" +
3549  kEndOfResponse);
3550  }
3551 
3553 }
3554 
3555 
3557 {
3558  if (m_CommandArguments.qname.empty() ||
3559  NStr::CompareNocase(m_CommandArguments.qname, "noname") == 0) {
3560  // Disconnecting from all the queues
3562  m_QueueName.clear();
3563 
3565 
3568  return;
3569  }
3570 
3571  // Here: connecting to another queue
3572 
3573  CRef<CQueue> queue_ref;
3574  CQueue * queue_ptr = NULL;
3575 
3576  // First, deal with the given queue - try to resolve it
3577  try {
3579  queue_ptr = queue_ref.GetPointer();
3580  }
3581  catch (...) {
3583  x_WriteMessage("ERR:eUnknownQueue:" + kEndOfResponse);
3585  return;
3586  }
3587 
3588  // Second, update the client with its capabilities for the new queue
3589  x_UpdateClientPassedChecks(queue_ptr);
3590 
3591  // Note:
3592  // The m_ClientId.CheckAccess(...) call will take place when a command
3593  // for the changed queue is executed.
3594 
3595 
3596  {
3597  // The client has appeared for the queue - touch the client registry
3598  bool client_was_found = false;
3599  bool session_was_reset = false;
3600  string old_session;
3601  bool had_wn_pref_affs = false;
3602  bool had_reader_pref_affs = false;
3603 
3604  queue_ptr->TouchClientsRegistry(m_ClientId, client_was_found,
3605  session_was_reset, old_session,
3606  had_wn_pref_affs,
3607  had_reader_pref_affs);
3608  if (client_was_found && session_was_reset) {
3609  if (x_NeedCmdLogging()) {
3610  string wn_val = "true";
3611  if (!had_wn_pref_affs)
3612  wn_val = "had none";
3613  string reader_val = "true";
3614  if (!had_reader_pref_affs)
3615  reader_val = "had none";
3616 
3617  GetDiagContext().Extra()
3618  .Print("client_node", m_ClientId.GetNode())
3619  .Print("client_session", m_ClientId.GetSession())
3620  .Print("client_old_session", old_session)
3621  .Print("wn_preferred_affinities_reset", wn_val)
3622  .Print("reader_preferred_affinities_reset", reader_val);
3623  }
3624  }
3625  }
3626 
3627  // Final step - update the current queue reference
3628 
3629  m_QueueRef.Reset(queue_ref);
3631 
3634 }
3635 
3636 
3638 {
3639  size_t used_slots = q->GetScopeSlotsUsed();
3640  unsigned int max_slots = m_Server->GetScopeRegistrySettings().max_records;
3641 
3642  if (used_slots >= max_slots) {
3643  ERR_POST("All scope slots are in use");
3645  x_WriteMessage("ERR:eInternalError:All scope slots are in use" +
3646  kEndOfResponse);
3648  return;
3649  }
3650 
3651  // Here: at the moment there are available scope slots, so let it go.
3656 }
3657 
3658 
3660 {
3661  unsigned int max_input_size;
3662  unsigned int max_output_size;
3663  map< string,
3664  map<string, string> > linked_sections;
3665 
3666  q->GetMaxIOSizesAndLinkedSections(max_input_size, max_output_size,
3667  linked_sections);
3668 
3669  if (m_CommandArguments.cmd == "GETP2") {
3670  string result("OK:max_input_size=" +
3671  to_string(max_input_size) + "&" +
3672  "max_output_size=" +
3673  to_string(max_output_size));
3674 
3675  for (map< string, map<string, string> >::const_iterator
3676  k = linked_sections.begin(); k != linked_sections.end(); ++k) {
3677  string prefix((k->first).c_str() + strlen("linked_section_"));
3678  for (map<string, string>::const_iterator j = k->second.begin();
3679  j != k->second.end(); ++j) {
3680  result += "&" + prefix + "::" +
3681  NStr::URLEncode(j->first) + "=" +
3682  NStr::URLEncode(j->second);
3683  }
3684  }
3686  } else {
3687  x_WriteMessage("OK:max_input_size=" + to_string(max_input_size) + ";"
3688  "max_output_size=" + to_string(max_output_size) + ";" +
3690  }
3692 }
3693 
3694 
3696 {
3697  CQueue::TParameterList parameters = q->GetParameters();
3698  string configuration;
3699 
3700  ITERATE(CQueue::TParameterList, it, parameters) {
3701  configuration += "OK:" + it->first + '=' + it->second + kEndOfResponse;
3702  }
3703  x_WriteMessage(configuration + "OK:END" + kEndOfResponse);
3705 }
3706 
3707 
3709 {
3710  bool cmdv2(m_CommandArguments.cmd == "READ2");
3711 
3712  x_CheckNonAnonymousClient("use " + m_CommandArguments.cmd + " command");
3714 
3715  if (cmdv2)
3717  else {
3718  // Affinity flags are for the second version of the command
3721 
3722  // This flag mimics the GET command behavior
3725  }
3726 
3727  CJob job;
3728  bool no_more_jobs = true;
3729  string added_pref_aff;
3730 
3731  list<string> aff_list;
3732  NStr::Split(m_CommandArguments.affinity_token, "\t,", aff_list);
3733  list<string> group_list;
3734  NStr::Split(m_CommandArguments.group,"\t,", group_list);
3735 
3740  &aff_list,
3745  &group_list,
3748  &job,
3749  &no_more_jobs, m_RollbackAction,
3750  added_pref_aff) == false) {
3751  // Preferred affinities were reset for the client, so no job
3752  // and bad request
3754  x_WriteMessage("ERR:ePrefAffExpired:" + kEndOfResponse);
3755  } else {
3756  unsigned int job_id = job.GetId();
3757  string job_key;
3758 
3759  if (job_id) {
3760  job_key = q->MakeJobKey(job_id);
3761  x_WriteMessage("OK:job_key=" + job_key +
3762  "&client_ip=" + NStr::URLEncode(job.GetClientIP()) +
3763  "&client_sid=" + NStr::URLEncode(job.GetClientSID()) +
3764  "&ncbi_phid=" + NStr::URLEncode(job.GetNCBIPHID()) +
3765  "&auth_token=" + job.GetAuthToken() +
3766  "&status=" +
3768  job.GetStatusBeforeReading()) +
3769  "&affinity=" +
3771  job.GetAffinityId())) +
3772  kEndOfResponse);
3774  x_LogCommandWithJob(job);
3775 
3776  if (!added_pref_aff.empty()) {
3777  if (x_NeedCmdLogging()) {
3779  GetDiagContext().Extra()
3780  .Print("added_preferred_affinity", added_pref_aff);
3781  else
3782  GetDiagContext().Extra()
3783  .Print("client_node", m_ClientId.GetNode())
3784  .Print("client_session", m_ClientId.GetSession())
3785  .Print("added_preferred_affinity", added_pref_aff);
3786  }
3787  }
3788  }
3789  else
3790  x_WriteMessage("OK:no_more_jobs=" +
3791  NStr::BoolToString(no_more_jobs) +
3792  kEndOfResponse);
3793 
3794  if (x_NeedCmdLogging()) {
3795  if (job_id)
3796  GetDiagContext().Extra().Print("job_key", job_key);
3797  else
3798  GetDiagContext().Extra().Print("job_key", "None")
3799  .Print("no_more_jobs", no_more_jobs);
3800  }
3801  }
3802 
3804 }
3805 
3806 
3808 {
3809  x_CheckNonAnonymousClient("use CFRM command");
3811 
3812  CJob job;
3813  TJobStatus old_status = q->ConfirmReadingJob(
3814  m_ClientId,
3817  job,
3819  x_FinalizeReadCommand("CFRM", old_status, job);
3820 }
3821 
3822 
3824 {
3825  x_CheckNonAnonymousClient("use FRED command");
3827 
3828  CJob job;
3829  TJobStatus old_status = q->FailReadingJob(
3830  m_ClientId,
3833  job,
3837  x_FinalizeReadCommand("FRED", old_status, job);
3838 }
3839 
3840 
3842 {
3843  x_CheckNonAnonymousClient("use RDRB command");
3845 
3846  CJob job;
3847  TJobStatus old_status = q->ReturnReadingJob(
3848  m_ClientId,
3851  job,
3853  false,
3856  x_FinalizeReadCommand("RDRB", old_status, job);
3857 }
3858 
3859 
3861 {
3862  x_CheckNonAnonymousClient("use REREAD command");
3863 
3864  CJob job;
3865  bool no_op = false;
3866  TJobStatus old_status = q->RereadJob(m_ClientId,
3869  job,
3870  no_op);
3871 
3872  if (old_status == CNetScheduleAPI::eJobNotFound) {
3873  ERR_POST(Warning << "REREAD for unknown job "
3878  return;
3879  }
3880 
3881  if (old_status == CNetScheduleAPI::ePending ||
3882  old_status == CNetScheduleAPI::eRunning ||
3883  old_status == CNetScheduleAPI::eReading) {
3884  ERR_POST(Warning << "Cannot reread job "
3886  << "; job is in "
3887  << CNetScheduleAPI::StatusToString(old_status)
3888  << " state");
3890  x_WriteMessage("ERR:eInvalidJobStatus:Cannot reread job; job is in " +
3891  CNetScheduleAPI::StatusToString(old_status) + " state" +
3892  kEndOfResponse);
3893  } else if (no_op) {
3894  ERR_POST(Warning << "Cannot reread job "
3896  << "; job has not been read yet");
3897  x_WriteMessage("OK:WARNING:eJobNotRead:The job has not been read yet;" +
3898  kEndOfResponse);
3899  } else {
3901  }
3902 
3903  x_LogCommandWithJob(job);
3905 }
3906 
3907 
3909  TJobStatus old_status,
3910  const CJob & job)
3911 {
3912  if (old_status == CNetScheduleAPI::eJobNotFound) {
3913  ERR_POST(Warning << command << " for unknown job "
3918  return;
3919  }
3920 
3921  if (old_status != CNetScheduleAPI::eReading) {
3922  string operation = "unknown";
3923 
3924  if (command == "CFRM") operation = "confirm";
3925  else if (command == "FRED") operation = "fail";
3926  else if (command == "RDRB") operation = "rollback";
3927 
3928  ERR_POST(Warning << "Cannot " << operation
3929  << " read job; job is in "
3930  << CNetScheduleAPI::StatusToString(old_status)
3931  << " state");
3933  x_WriteMessage("ERR:eInvalidJobStatus:Cannot " +
3934  operation + " job; job is in " +
3935  CNetScheduleAPI::StatusToString(old_status) + " state" +
3936  kEndOfResponse);
3937  } else
3939 
3940  x_LogCommandWithJob(job);
3942 }
3943 
3944 
3946 {
3947  unsigned int limit = m_Server->GetMaxClientData();
3948  unsigned int data_size = m_CommandArguments.client_data.size();
3949 
3950  if (data_size > limit) {
3951  ERR_POST(Warning << "Client data is too long. It must be <= "
3952  << limit
3953  << " bytes. Received "
3954  << data_size
3955  << " bytes.");
3957  x_WriteMessage("ERR:eInvalidParameter:Client data is too long. "
3958  "It must be <= " + to_string(limit) +
3959  " bytes. Received " + to_string(data_size) +
3960  " bytes." + kEndOfResponse);
3961  } else {
3962  int current_data_version = q->SetClientData(m_ClientId,
3965  x_WriteMessage("OK:version=" + to_string(current_data_version) +
3966  kEndOfResponse);
3967  }
3969 }
3970 
3971 
3973 {
3974  bool client_found = false;
3975  bool had_wn_pref_affs = false;
3976  bool had_reader_pref_affs = false;
3977  string old_session;
3978 
3979  q->ClearWorkerNode(m_ClientId, client_found,
3980  old_session, had_wn_pref_affs, had_reader_pref_affs);
3981 
3982  if (client_found) {
3983  if (x_NeedCmdLogging()) {
3984  string wn_val = "true";
3985  if (!had_wn_pref_affs)
3986  wn_val = "had none";
3987  string reader_val = "true";
3988  if (!had_reader_pref_affs)
3989  reader_val = "had none";
3990 
3991  GetDiagContext().Extra()
3992  .Print("client_node", m_ClientId.GetNode())
3993  .Print("client_session", m_ClientId.GetSession())
3994  .Print("client_old_session", old_session)
3995  .Print("wn_preferred_affinities_reset", wn_val)
3996  .Print("reader_preferred_affinities_reset", reader_val);
3997  }
3998  }
3999 
4002 }
4003 
4004 
4006 {
4007  unsigned int count = q->CancelAllJobs(m_ClientId, x_NeedCmdLogging());
4008  x_WriteMessage("OK:" + to_string(count) + kEndOfResponse);
4010 }
4011 
4012 
4014 {
4015  if (m_CommandArguments.mode == false &&
4018  x_WriteMessage("ERR:eShuttingDown:"
4019  "Server is in drained shutting down state" +
4020  kEndOfResponse);
4022  return;
4023  }
4024 
4025  if (q == NULL) {
4026  // This is a whole server scope request
4031  return;
4032  }
4033 
4034  // This is a queue scope request.
4037 
4038  if (m_CommandArguments.mode == false &&
4039  m_Server->GetRefuseSubmits() == true)
4040  x_WriteMessage("OK:WARNING:eSubmitsDisabledForServer:Submits are "
4041  "disabled on the server level;" + kEndOfResponse);
4042  else
4046 }
4047 
4048 
4050 {
4051  CQueue::TPauseStatus current = q->GetPauseStatus();
4052  CQueue::TPauseStatus new_value;
4053 
4055  new_value = CQueue::ePauseWithPullback;
4056  else
4057  new_value = CQueue::ePauseWithoutPullback;
4058 
4059  q->SetPauseStatus(m_ClientId, new_value);
4060  if (current == CQueue::eNoPause)
4062  else {
4063  string reply = "OK:WARNING:eQueueAlreadyPaused:The queue has "
4064  "already been paused (previous pullback value is ";
4065  if (current == CQueue::ePauseWithPullback) reply += "true";
4066  else reply += "false";
4067  reply += ", new pullback value is ";
4068  if (m_CommandArguments.pullback) reply += "true";
4069  else reply += "false";
4070  x_WriteMessage(reply + ");" + kEndOfResponse);
4071  }
4073 }
4074 
4075 
4077 {
4078  CQueue::TPauseStatus current = q->GetPauseStatus();
4079 
4081  if (current == CQueue::eNoPause)
4082  x_WriteMessage("OK:WARNING:eQueueNotPaused:"
4083  "The queue is not paused;" + kEndOfResponse);
4084  else
4087 }
4088 
4089 
4091 {
4093  x_WriteMessage("ERR:eObsoleteCommand:" + kEndOfResponse);
4095 }
4096 
4097 
4099 {
4101  x_WriteMessage("OK:WARNING:eCommandObsolete:"
4102  "Command is obsolete and will be ignored;" +
4103  kEndOfResponse);
4105 }
4106 
4107 
4109 {
4110  if (!m_ClientId.IsComplete())
4111  NCBI_THROW(CNetScheduleException, eInvalidClient,
4112  "Anonymous client (no client_node and client_session"
4113  " at handshake) cannot " + message);
4114 }
4115 
4116 
4118 {
4119  if ((m_CommandArguments.port != 0 &&
4120  m_CommandArguments.timeout == 0) ||
4121  (m_CommandArguments.port == 0 &&
4123  NCBI_THROW(CNetScheduleException, eInvalidParameter,
4124  "Either both or neither of the port and "
4125  "timeout parameters must be 0");
4126 }
4127 
4128 
4130 {
4131  if (m_CommandArguments.auth_token.empty())
4132  NCBI_THROW(CNetScheduleException, eInvalidAuthToken,
4133  "Invalid authorization token. It cannot be empty.");
4134 }
4135 
4136 
4138 {
4139  // Checks that the given GETx/JXCG parameters make sense
4140  if (m_CommandArguments.wnode_affinity == false &&
4141  m_CommandArguments.any_affinity == false &&
4144  ERR_POST(Warning << "The job request without explicit affinities, "
4145  "without preferred affinities and "
4146  "with any_aff flag set to false "
4147  "will never match any job.");
4148  }
4149  if (m_CommandArguments.exclusive_new_aff == true &&
4151  NCBI_THROW(CNetScheduleException, eInvalidParameter,
4152  "It is forbidden to have both any_affinity and "
4153  "exclusive_new_aff GET2 flags set to 1.");
4154  if (m_CommandArguments.prioritized_aff == true &&
4156  NCBI_THROW(CNetScheduleException, eInvalidParameter,
4157  "It is forbidden to have both prioritized_aff and "
4158  "wnode_aff GET2 flags set to 1.");
4159  if (m_CommandArguments.prioritized_aff == true &&
4161  NCBI_THROW(CNetScheduleException, eInvalidParameter,
4162  "It is forbidden to have both prioritized_aff and "
4163  "exclusive_new_aff GET2 flags set to 1.");
4164  if (m_CommandArguments.prioritized_aff == true &&
4166  NCBI_THROW(CNetScheduleException, eInvalidParameter,
4167  "If the prioritized_aff GET2 flag set to 1 then "
4168  "a non empty list of explicit affinities must be provided.");
4169 }
4170 
4171 
4173 {
4174  // Checks that the given READ parameters make sense
4175  if (m_CommandArguments.reader_affinity == false &&
4176  m_CommandArguments.any_affinity == false &&
4179  ERR_POST(Warning << "The job read request without explicit affinities, "
4180  "without preferred affinities and "
4181  "with any_aff flag set to false "
4182  "will never match any job.");
4183  }
4184  if (m_CommandArguments.exclusive_new_aff == true &&
4186  NCBI_THROW(CNetScheduleException, eInvalidParameter,
4187  "It is forbidden to have both any_aff and "
4188  "exclusive_new_aff READ2 flags set to 1.");
4189  if (m_CommandArguments.prioritized_aff == true &&
4191  NCBI_THROW(CNetScheduleException, eInvalidParameter,
4192  "It is forbidden to have both prioritized_aff and "
4193  "reader_aff READ2 flags set to 1.");
4194  if (m_CommandArguments.prioritized_aff == true &&
4196  NCBI_THROW(CNetScheduleException, eInvalidParameter,
4197  "It is forbidden to have both prioritized_aff and "
4198  "exclusive_new_aff READ2 flags set to 1.");
4199  if (m_CommandArguments.prioritized_aff == true &&
4201  NCBI_THROW(CNetScheduleException, eInvalidParameter,
4202  "If the prioritized_aff READ2 flag set to 1 then "
4203  "a non empty list of explicit affinities must be provided.");
4204 }
4205 
4206 
4208 {
4209  // One of the arguments must be provided: qname or service
4210  if (m_CommandArguments.qname.empty() &&
4211  m_CommandArguments.service.empty())
4212  NCBI_THROW(CNetScheduleException, eInvalidParameter,
4213  "QINF2 command expects a queue name or a service name. "
4214  "Nothing has been provided.");
4215 
4216  if (!m_CommandArguments.qname.empty() &&
4217  !m_CommandArguments.service.empty())
4218  NCBI_THROW(CNetScheduleException, eInvalidParameter,
4219  "QINF2 command expects only one value: queue name or "
4220  "a service name. Both have been provided.");
4221 }
4222 
4223 
4225 {
4226  if (m_CmdContext.NotNull()) {
4227  // Need to log SID/IP/PHID only if the command does not have
4228  // a job key in the parameters
4229  bool is_worker_node_command = x_WorkerNodeCommand();
4230 
4232 
4233  // A client IP and a session ID must be set as early as possible
4234  if (!is_worker_node_command) {
4235  ITERATE(TNSProtoParams, it, cmd.params) {
4236  if (it->first == "ip")
4238  else if (it->first == "sid")
4240  }
4241  }
4242 
4243  CDiagContext_Extra ctxt_extra =
4245  .Print("_type", "cmd")
4246  .Print("_queue", m_QueueName)
4247  .Print("cmd", cmd.command->cmd)
4248  .Print("peer", GetSocket().GetPeerAddress(eSAF_IP))
4249  .Print("conn", x_GetConnRef());
4250 
4251  for (const auto & param : cmd.params) {
4252  // IP is set before the print request start
4253  if (param.first == "ip")
4254  continue;
4255  // SID is set before the print request start
4256  if (param.first == "sid")
4257  continue;
4258  // Skip ncbi_phid because it is printed by request start anyway
4259  if (param.first == "ncbi_phid")
4260  continue;
4261 
4262  if (param.first == "status")
4263  ctxt_extra.Print("job_status", param.second);
4264  else
4265  ctxt_extra.Print(param.first, param.second);
4266  }
4267  ctxt_extra.Flush();
4268 
4269  // Workaround:
4270  // When extra of the GetDiagContext().PrintRequestStart() is destroyed
4271  // or flushed it also resets the status to 0 so I need to set it here
4272  // to 200 though it was previously set to 200 when the request context
4273  // is created.
4275  }
4276 }
4277 
4278 
4280 {
4281  if (m_CmdContext.NotNull()) {
4284  .Print("_type", "cmd")
4285  .Print("_queue", m_QueueName)
4286  .Print("info", msg)
4287  .Print("peer", GetSocket().GetPeerAddress(eSAF_IP))
4288  .Print("conn", x_GetConnRef())
4289  .Flush();
4290 
4291  // Workaround:
4292  // When extra of the GetDiagContext().PrintRequestStart() is destroyed
4293  // or flushed it also resets the status to 0 so I need to set it here
4294  // to 200 though it was previously set to 200 when the request context
4295  // is created.
4297  }
4298 }
4299 
4300 
4302 {
4303  if (m_CmdContext.NotNull()) {
4306  m_CmdContext.Reset();
4307  }
4308 }
4309 
4310 // The function forms a responce for various 'get job' commands and prints
4311 // extra to the log if required
4312 void
4314  const CJob & job,
4315  bool cmdv2)
4316 {
4317  if (!job.GetId()) {
4318  // No suitable job found
4319  if (x_NeedCmdLogging())
4320  GetDiagContext().Extra().Print("job_key", "None");
4322  return;
4323  }
4324 
4325  string job_key = q->MakeJobKey(job.GetId());
4326  if (x_NeedCmdLogging()) {
4327  // The only piece required for logging is the job key
4328  GetDiagContext().Extra().Print("job_key", job_key);
4329  }
4330 
4331  if (cmdv2) {
4332  string submitter_notif_info;
4333  if (job.GetSubmNotifPort() != 0) {
4334  string host = CSocketAPI::ntoa(job.GetSubmAddr());
4335  if (host == "127.0.0.1") {
4336  unsigned int my_addr = CSocketAPI::GetLocalHostAddress();
4337  host = CSocketAPI::ntoa(my_addr);
4338  if (host == "127.0.0.1") {
4339  ERR_POST(Warning <<
4340  "Could not detect the self host address "