45 #include <sys/types.h>
46 #include <sys/socket.h>
47 #include <netinet/in.h>
48 #include <netinet/tcp.h>
335 {
"affinity_may_change",
350 {
"affinity_may_change",
594 ((
string*)
data)->append((
const char *) ptr,
size);
615 m_ProcessMessage(
NULL),
619 m_WithinBatchSubmit(
false),
620 m_SingleCmdParser(sm_CommandMap),
621 m_BatchHeaderParser(sm_BatchHeaderMap),
622 m_BatchEndParser(sm_BatchEndMap),
623 m_ClientIdentificationPrinted(
false),
624 m_RollbackAction(
NULL)
708 (read_count > 0 || write_count > 0)) {
716 ERR_POST(
"Unseccessfull client socket shutdown. "
717 "The socket may have data not delivered to the client. "
718 "Error code: " << status <<
": " <<
IO_StatusStr(status));
746 ERR_POST(
"eCommunicationError:Connection pool full");
749 ERR_POST(
"eCommunicationError:Unpollable connection");
752 ERR_POST(
"eCommunicationError:Request queue full");
755 ERR_POST(
"eCommunicationError:Unknown overflow error");
767 ERR_POST(
"NetSchedule is shutting down. Client input rejected.");
770 "is shutting down. Session aborted." +
777 string error_client_message;
778 unsigned int error_code;
819 catch (
const exception & ex) {
821 error_client_message =
"ERR:" +
823 "error - " +
string(ex.
what()));
827 ERR_POST(
"ERR:Unknown server exception.");
828 error_client_message =
"ERR:eInternalError:Unknown server exception.";
852 setsockopt(fd, IPPROTO_TCP, TCP_QUICKACK, &
val,
sizeof(
val));
860 size_t required_size)
872 #if defined(_DEBUG) && !defined(NDEBUG)
880 nanosleep(&delay,
NULL);
897 msg_size =
value.size();
898 while (msg_size >= 1 &&
msg[msg_size-1] ==
'\n')
900 required_size = msg_size + 1;
922 size_t msg_size =
msg.size();
923 bool has_eom =
false;
925 while (msg_size >= 1 &&
msg[msg_size-1] ==
'\n') {
930 size_t required_size = msg_size + 1;
931 const char * msg_buf =
NULL;
934 msg_buf =
msg.data();
947 #if defined(_DEBUG) && !defined(NDEBUG)
970 size_t written_bytes,
976 "Error writing message to the client. "
978 "Socket write error status: " +
IO_StatusStr(write_result) +
". "
979 "Written bytes: " + to_string(written_bytes) +
". "
980 "Socket write timing: " + to_string(
double(timing)) +
". "
981 "Message begins with: ";
983 report +=
msg.substr(0, 32) +
" (TRUNCATED)";
1011 unsigned int peer_addr;
1056 string msg =
"Error authenticating client: '";
1090 for (
const auto & param : params) {
1091 if (param.first ==
"status") {
1092 diag_extra.
Print(
"job_status", param.second);
1094 diag_extra.
Print(param.first, param.second);
1195 bool restore_client =
false;
1197 unsigned int orig_client_id = 0;
1212 restore_client =
true;
1220 "Job queue is required");
1239 bool client_was_found =
false;
1240 bool session_was_reset =
false;
1242 bool had_wn_pref_affs =
false;
1243 bool had_reader_pref_affs =
false;
1247 session_was_reset, old_session,
1248 had_wn_pref_affs, had_reader_pref_affs);
1249 if (client_was_found && session_was_reset) {
1251 string wn_val =
"true";
1252 if (!had_wn_pref_affs)
1253 wn_val =
"had none";
1254 string reader_val =
"true";
1255 if (!had_reader_pref_affs)
1256 reader_val =
"had none";
1261 .
Print(
"client_old_session", old_session)
1262 .
Print(
"wn_preferred_affinities_reset", wn_val)
1263 .
Print(
"reader_preferred_affinities_reset", reader_val);
1273 if (restore_client) {
1320 if (!size_str.
empty())
1347 ERR_POST(
"Error processing command: " << ex);
1359 ERR_POST(
"Unknown error while expecting BTCH or ENDS");
1407 ERR_POST(
"Error processing command: " << ex);
1410 "Invalid batch submission, syntax error" +
1421 ERR_POST(
"Arguments parsing unknown exception. "
1422 "Batch submit is rejected.");
1425 "Arguments parsing unknown exception" +
1480 ERR_POST(
"Error processing command: " << ex);
1483 "Batch submit error - unexpected end of batch" +
1494 ERR_POST(
"Unknown error while expecting ENDB.");
1497 "Unknown error while expecting ENDB." +
1513 ctx->SetRequestID();
1516 .Print(
"_type",
"cmd")
1520 .Print(
"cmd",
"BTCH")
1537 .
Print(
"start_job", job_id)
1538 .
Print(
"commit_time",
1541 .
Print(
"transaction_time",
1556 SetRequestStatus(ex.ErrCodeToHTTPStatusCode());
1602 <<
" for unknown job: "
1615 reply.reserve(1024);
1617 reply.append(
"OK:job_status=")
1619 .append(
"&job_exptime=")
1620 .append(to_string(lifetime.
Sec()));
1623 reply.append(
"&pause=pullback");
1625 reply.append(
"&pause=nopullback");
1628 reply.append(
"&msg=")
1649 string progress_msg;
1651 client_ip, client_sid,
1652 client_phid, progress_msg,
1658 <<
" for unknown job: "
1672 reply.reserve(1024);
1673 reply.append(
"OK:job_status=")
1675 .append(
"&job_exptime=")
1676 .append(to_string(lifetime.
Sec()));
1679 reply.append(
"&pause=pullback");
1681 reply.append(
"&pause=nopullback");
1684 reply.append(
"&msg=")
1706 <<
" with neither add list nor del list");
1713 list<string> aff_to_add_list;
1714 list<string> aff_to_del_list;
1717 "\t,", aff_to_add_list);
1719 "\t,", aff_to_del_list);
1727 for (list<string>::const_iterator k = aff_to_add_list.begin();
1728 k != aff_to_add_list.end(); ++k) {
1729 if (find(aff_to_del_list.begin(), aff_to_del_list.end(), *k) !=
1730 aff_to_del_list.end()) {
1733 " is in both add and del lists" +
1758 for (list<string>::const_iterator k = msgs.begin();
1759 k != msgs.end(); ++k)
1760 msg +=
"WARNING:" + *k +
";";
1777 list<string> aff_to_set;
1928 "Neither job key nor a group nor an "
1929 "affinity nor a status list is provided "
1930 "for the CANCEL command");
1933 "Job key or any combination of a group and an affinity "
1945 "CANCEL can accept either a job "
1946 "key or any combination of a group "
1947 "and an affinity and job statuses");
1949 x_WriteMessage(
"ERR:eInvalidParameter:CANCEL can accept either a job "
1950 "key or any combination of a group and an affinity and "
1964 vector<string> warnings;
1965 vector<TJobStatus> statuses;
1968 bool reported =
false;
1969 vector<TJobStatus>::iterator k =
1974 warnings.push_back(
"eInvalidJobStatus:unknown job "
1975 "status in the status list");
1978 "Unknown job status in the status list. "
1979 "Ignore and continue.");
1992 k = statuses.begin();
1993 while (k != statuses.end()) {
1995 warnings.push_back(
"eIgnoringCanceledStatus:attempt to "
1996 "cancel jobs in the 'Canceled' status");
1999 "Attempt to cancel jobs in the 'Canceled' "
2000 "status. Ignore and continue.");
2014 if (warnings.empty())
2018 for (vector<string>::const_iterator k = warnings.begin();
2019 k != warnings.end(); ++k) {
2020 msg +=
"WARNING:" + *k +
";";
2037 "CANCEL for unknown job: " <<
2066 <<
" for unknown job: "
2081 reply.reserve(2048);
2084 string pause_status_msg;
2088 pause_status_msg =
"&pause=pullback";
2090 pause_status_msg =
"&pause=nopullback";
2092 string progress_msg_part;
2094 progress_msg_part.append(
"&msg=")
2098 .append(
"job_status=")
2100 .append(
"&client_ip=")
2102 .append(
"&client_sid=")
2104 .append(
"&ncbi_phid=")
2106 .append(
"&job_exptime=")
2107 .append(to_string(lifetime.
Sec()))
2108 .append(
"&ret_code=")
2112 .append(
"&err_msg=")
2116 .append(pause_status_msg)
2117 .append(progress_msg_part)
2121 .append(to_string((
int) job.
GetStatus()))
2170 string pause_status_str;
2173 pause_status_str =
"pullback";
2175 pause_status_str =
"nopullback";
2185 "pause: " + pause_status_str);
2192 list<string> aff_list;
2195 list<string> group_list;
2200 string added_pref_aff;
2214 added_pref_aff) ==
false) {
2223 if (!added_pref_aff.empty()) {
2227 .
Print(
"added_preferred_affinity", added_pref_aff);
2232 .
Print(
"added_preferred_affinity", added_pref_aff);
2289 ERR_POST(
Warning <<
"Accepting results for a job in the FAILED state.");
2298 <<
" results. The job has already been done.");
2308 <<
" results. The job is unknown");
2318 <<
" results; job is in "
2323 "Cannot accept job results; job is in " +
2355 <<
" results. The job is unknown");
2358 ERR_POST(
Warning <<
"Accepting results for a job in the FAILED state.");
2365 <<
" results. The job has already been done.");
2382 string pause_status_str;
2385 pause_status_str =
"pullback";
2387 pause_status_str =
"nopullback";
2391 "pause: " + pause_status_str);
2397 list<string> aff_list;
2401 string added_pref_aff;
2415 added_pref_aff) ==
false) {
2421 if (added_pref_aff.empty() ==
false) {
2425 .
Print(
"added_preferred_affinity", added_pref_aff);
2430 .
Print(
"added_preferred_affinity", added_pref_aff);
2471 <<
"MGET for unknown job "
2529 x_WriteMessage(
"ERR:eInvalidJobStatus:Cannot fail job; job is in " +
2538 if (warning.empty())
2583 warning, return_option);
2586 if (warning.empty())
2610 x_WriteMessage(
"ERR:eInvalidJobStatus:Cannot return job; job is in " +
2625 bool auth_token_ok =
true;
2645 if (!auth_token_ok) {
2675 x_WriteMessage(
"ERR:eInvalidJobStatus:Cannot reschedule job; job is in " +
2712 x_WriteMessage(
"ERR:eInvalidJobStatus:Cannot redo job; job is in " +
2729 <<
" in JDEX for job "
2776 <<
" in JDREX for job "
2821 size_t last_event_index = 0;
2839 string progress_msg_part;
2841 progress_msg_part =
"&msg=" +
2846 "&last_event_index=" +
2847 to_string(last_event_index) +
2863 if (!what.empty() && what !=
"QCLASSES" && what !=
"QUEUES" &&
2864 what !=
"JOBS" && what !=
"ALL" && what !=
"CLIENTS" &&
2865 what !=
"NOTIFICATIONS" && what !=
"AFFINITIES" &&
2866 what !=
"GROUPS" && what !=
"WNODE" && what !=
"SERVICES" &&
2867 what !=
"ALERTS" && what !=
"SCOPES") {
2869 "Unsupported '" + what +
2870 "' parameter for the STAT command.");
2873 if (q ==
NULL && (what ==
"CLIENTS" || what ==
"NOTIFICATIONS" ||
2874 what ==
"AFFINITIES" || what ==
"GROUPS" ||
2875 what ==
"WNODE" || what ==
"SCOPES")) {
2877 "STAT " + what +
" requires a queue");
2883 if (what ==
"QCLASSES") {
2893 if (what ==
"QUEUES") {
2903 if (what ==
"SERVICES") {
2908 k != services.
end(); ++k) {
2911 output += k->first +
"=" + k->second;
2917 if (what ==
"ALERTS") {
2935 info +=
"OK:SubmitsDisabledEffective: ";
2940 info +=
"OK:DrainedShutdown: ";
2956 if (!what.empty() && what !=
"ALL") {
2962 string info =
"OK:Started: " +
2965 info +=
"OK:SubmitsDisabledEffective: ";
2970 info +=
"OK:SubmitsDisabledPrivate: ";
2982 if (what ==
"ALL") {
2986 " bit_blk=" + to_string(bv_stat.
bit_blocks) +
2987 "; gap_blk=" + to_string(bv_stat.
gap_blocks) +
3011 "Access denied: admin privileges required");
3024 vector<string> config_warnings;
3025 bool admin_decrypt_error(
false);
3028 if (!config_warnings.empty()) {
3031 for (vector<string>::const_iterator k = config_warnings.begin();
3032 k != config_warnings.end(); ++k) {
3044 msg =
"ERR:eInvalidParameter:Configuration file is not "
3045 "well formed. " +
msg;
3046 if (
msg.size() > 1024) {
3048 msg +=
" TRUNCATED";
3057 vector<string> config_checksum_warnings;
3060 if (config_checksum_warnings.empty()) {
3064 for (vector<string>::const_iterator
3065 k = config_checksum_warnings.begin();
3066 k != config_checksum_warnings.end(); ++k)
3081 if (what_changed.
GetSize() == 0 &&
3083 services_changed.
GetSize() == 0) {
3089 "changeable parameters were identified in the new "
3097 diff.
SetByKey(k.GetKey(), k.GetNode());
3099 diff.
SetByKey(k.GetKey(), k.GetNode());
3101 string diff_as_string = diff.
Repr();
3111 "file has not been changed, RECO ignored;" +
3135 "parameters or any combination of a group and "
3136 "an affinity and job statuses");
3138 x_WriteMessage(
"ERR:eInvalidParameter:DUMP can accept either a job "
3139 "key or no parameters or any combination of a group and "
3148 bool reported =
false;
3149 vector<TJobStatus>::iterator k =
3156 "Unknown job status in the status list. "
3157 "Ignore and continue.");
3166 vector<string> warnings;
3175 statuses.size() == 0)
3198 if (job_info.empty()) {
3221 "already in drain shutdown state;" +
3243 string configuration;
3263 configuration =
string(converter);
3274 static string reply =
3299 bool mem_used_result =
3302 int proc_fd_soft_limit;
3303 int proc_fd_hard_limit;
3305 &proc_fd_soft_limit,
3306 &proc_fd_hard_limit);
3309 #if defined(_DEBUG) && !defined(NDEBUG)
3314 if (err_emul.
as_int >= 0)
3315 proc_fd_used = err_emul.
as_int;
3319 if (err_emul.
as_int >= 0)
3335 if (process_time_result)
3336 reply +=
"&user_time=" + to_string(user_time) +
3337 "&system_time=" + to_string(system_time) +
3338 "&real_time=" + to_string(real_time);
3340 reply +=
"&user_time=n/a&system_time=n/a&real_time=n/a";
3342 if (physical_memory > 0)
3343 reply +=
"&physical_memory=" + to_string(physical_memory);
3345 reply +=
"&physical_memory=n/a";
3347 if (mem_used_result)
3348 reply +=
"&mem_used_total=" + to_string(mem_used.
total) +
3349 "&mem_used_total_peak=" + to_string(mem_used.
total_peak) +
3350 "&mem_used_resident=" + to_string(mem_used.
resident) +
3351 "&mem_used_resident_peak=" + to_string(mem_used.
resident_peak) +
3352 "&mem_used_shared=" + to_string(mem_used.
shared) +
3353 "&mem_used_data=" + to_string(mem_used.
data) +
3354 "&mem_used_stack=" + to_string(mem_used.
stack) +
3355 "&mem_used_text=" + to_string(mem_used.
text) +
3356 "&mem_used_lib=" + to_string(mem_used.
lib) +
3357 "&mem_used_swap=" + to_string(mem_used.
swap);
3359 reply +=
"&mem_used_total=n/a"
3360 "&mem_used_total_peak=n/a"
3361 "&mem_used_resident=n/a"
3362 "&mem_used_resident_peak=n/a"
3363 "&mem_used_shared=n/a"
3364 "&mem_used_data=n/a"
3365 "&mem_used_stack=n/a"
3366 "&mem_used_text=n/a"
3368 "&mem_used_swap=n/a";
3370 if (proc_fd_soft_limit >= 0)
3371 reply +=
"&proc_fd_soft_limit=" + to_string(proc_fd_soft_limit);
3373 reply +=
"&proc_fd_soft_limit=n/a";
3375 if (proc_fd_hard_limit >= 0)
3376 reply +=
"&proc_fd_hard_limit=" + to_string(proc_fd_hard_limit);
3378 reply +=
"&proc_fd_hard_limit=n/a";
3380 if (proc_fd_used >= 0)
3381 reply +=
"&proc_fd_used=" + to_string(proc_fd_used);
3383 reply +=
"&proc_fd_used=n/a";
3385 if (proc_thread_count >= 1)
3386 reply +=
"&proc_thread_count=" + to_string(proc_thread_count);
3388 reply +=
"&proc_thread_count=n/a";
3391 if (!alerts.empty())
3392 reply +=
"&" + alerts;
3401 size_t args_size = arguments.
Size();
3402 string cmdline_args;
3403 for (
size_t index = 0; index < args_size; ++index) {
3405 cmdline_args +=
" ";
3406 cmdline_args += arguments[index];
3483 if (qname.empty()) {
3498 string linked_sections_part;
3502 vector<string> warnings;
3507 jobs_per_state, warnings);
3511 jobs_part.append(1,
'&')
3514 .append(to_string(jobs_per_state[index]));
3515 total += jobs_per_state[index];
3517 jobs_part.append(
"&Total=")
3518 .append(to_string(total));
3521 k = linked_sections.
begin(); k != linked_sections.end(); ++k) {
3522 string prefix((k->first).c_str() + strlen(
"linked_section_"));
3524 j != k->second.end(); ++j) {
3525 linked_sections_part.append(1,
'&')
3535 qname_part.append(
"queue_name=")
3598 bool client_was_found =
false;
3599 bool session_was_reset =
false;
3601 bool had_wn_pref_affs =
false;
3602 bool had_reader_pref_affs =
false;
3605 session_was_reset, old_session,
3607 had_reader_pref_affs);
3608 if (client_was_found && session_was_reset) {
3610 string wn_val =
"true";
3611 if (!had_wn_pref_affs)
3612 wn_val =
"had none";
3613 string reader_val =
"true";
3614 if (!had_reader_pref_affs)
3615 reader_val =
"had none";
3620 .
Print(
"client_old_session", old_session)
3621 .
Print(
"wn_preferred_affinities_reset", wn_val)
3622 .
Print(
"reader_preferred_affinities_reset", reader_val);
3642 if (used_slots >= max_slots) {
3643 ERR_POST(
"All scope slots are in use");
3661 unsigned int max_input_size;
3662 unsigned int max_output_size;
3670 string result(
"OK:max_input_size=" +
3671 to_string(max_input_size) +
"&" +
3672 "max_output_size=" +
3673 to_string(max_output_size));
3676 k = linked_sections.
begin(); k != linked_sections.end(); ++k) {
3677 string prefix((k->first).c_str() + strlen(
"linked_section_"));
3679 j != k->second.end(); ++j) {
3680 result +=
"&" + prefix +
"::" +
3687 x_WriteMessage(
"OK:max_input_size=" + to_string(max_input_size) +
";"
3688 "max_output_size=" + to_string(max_output_size) +
";" +
3698 string configuration;
3701 configuration +=
"OK:" + it->first +
'=' + it->second +
kEndOfResponse;
3728 bool no_more_jobs =
true;
3729 string added_pref_aff;
3731 list<string> aff_list;
3733 list<string> group_list;
3750 added_pref_aff) ==
false) {
3756 unsigned int job_id = job.
GetId();
3776 if (!added_pref_aff.empty()) {
3780 .
Print(
"added_preferred_affinity", added_pref_aff);
3785 .
Print(
"added_preferred_affinity", added_pref_aff);
3799 .
Print(
"no_more_jobs", no_more_jobs);
3890 x_WriteMessage(
"ERR:eInvalidJobStatus:Cannot reread job; job is in " +
3896 <<
"; job has not been read yet");
3897 x_WriteMessage(
"OK:WARNING:eJobNotRead:The job has not been read yet;" +
3929 <<
" read job; job is in "
3950 if (data_size > limit) {
3953 <<
" bytes. Received "
3958 "It must be <= " + to_string(limit) +
3959 " bytes. Received " + to_string(data_size) +
3974 bool client_found =
false;
3975 bool had_wn_pref_affs =
false;
3976 bool had_reader_pref_affs =
false;
3980 old_session, had_wn_pref_affs, had_reader_pref_affs);
3984 string wn_val =
"true";
3985 if (!had_wn_pref_affs)
3986 wn_val =
"had none";
3987 string reader_val =
"true";
3988 if (!had_reader_pref_affs)
3989 reader_val =
"had none";
3994 .
Print(
"client_old_session", old_session)
3995 .
Print(
"wn_preferred_affinities_reset", wn_val)
3996 .
Print(
"reader_preferred_affinities_reset", reader_val);
4019 "Server is in drained shutting down state" +
4040 x_WriteMessage(
"OK:WARNING:eSubmitsDisabledForServer:Submits are "
4063 string reply =
"OK:WARNING:eQueueAlreadyPaused:The queue has "
4064 "already been paused (previous pullback value is ";
4066 else reply +=
"false";
4067 reply +=
", new pullback value is ";
4069 else reply +=
"false";
4102 "Command is obsolete and will be ignored;" +
4112 "Anonymous client (no client_node and client_session"
4113 " at handshake) cannot " + message);
4124 "Either both or neither of the port and "
4125 "timeout parameters must be 0");
4133 "Invalid authorization token. It cannot be empty.");
4145 "without preferred affinities and "
4146 "with any_aff flag set to false "
4147 "will never match any job.");
4152 "It is forbidden to have both any_affinity and "
4153 "exclusive_new_aff GET2 flags set to 1.");
4157 "It is forbidden to have both prioritized_aff and "
4158 "wnode_aff GET2 flags set to 1.");
4162 "It is forbidden to have both prioritized_aff and "
4163 "exclusive_new_aff GET2 flags set to 1.");
4167 "If the prioritized_aff GET2 flag set to 1 then "
4168 "a non empty list of explicit affinities must be provided.");
4179 ERR_POST(
Warning <<
"The job read request without explicit affinities, "
4180 "without preferred affinities and "
4181 "with any_aff flag set to false "
4182 "will never match any job.");
4187 "It is forbidden to have both any_aff and "
4188 "exclusive_new_aff READ2 flags set to 1.");
4192 "It is forbidden to have both prioritized_aff and "
4193 "reader_aff READ2 flags set to 1.");
4197 "It is forbidden to have both prioritized_aff and "
4198 "exclusive_new_aff READ2 flags set to 1.");
4202 "If the prioritized_aff READ2 flag set to 1 then "
4203 "a non empty list of explicit affinities must be provided.");
4213 "QINF2 command expects a queue name or a service name. "
4214 "Nothing has been provided.");
4219 "QINF2 command expects only one value: queue name or "
4220 "a service name. Both have been provided.");
4234 if (!is_worker_node_command) {
4236 if (it->first ==
"ip")
4238 else if (it->first ==
"sid")
4245 .Print(
"_type",
"cmd")
4247 .Print(
"cmd",
cmd.command->cmd)
4251 for (
const auto & param :
cmd.params) {
4253 if (param.first ==
"ip")
4256 if (param.first ==
"sid")
4259 if (param.first ==
"ncbi_phid")
4262 if (param.first ==
"status")
4263 ctxt_extra.
Print(
"job_status", param.second);
4265 ctxt_extra.
Print(param.first, param.second);
4284 .Print(
"_type",
"cmd")