41 #include <sys/types.h>
47 #define NCBI_USE_ERRCODE_X ConnServ_WorkerNode
49 #define DEFAULT_NS_TIMEOUT 30
80 unsigned short start_port,
unsigned short end_port) :
m_Control(
81 worker_node, start_port, end_port),
119 unsigned int auto_shutdown);
151 virtual void*
Main(
void);
152 virtual void OnExit(
void);
197 unsigned int auto_shutdown)
198 : m_Task(task), m_WorkerNode(worker_node),
199 m_TaskContext(*this),
200 m_Wait1(0,100000), m_Wait2(0,1000000),
202 m_RunInterval(run_delay),
204 m_ThreadName(worker_node->GetAppName() +
"_id")
213 LOG_POST_X(47,
"Has been idle (no jobs to process) over " << idle <<
" seconds. Exiting.");
232 LOG_POST_X(47,
"Has been idle (no jobs to process) over " << idle <<
" seconds. Exiting.");
242 "CWorkerNodeIdleThread::Main: Idle Task failed");
263 m_Thread(thread), m_RunAgain(
false)
371 m_JobProcessorFactory(job_factory),
375 m_CommitJobInterval(2),
376 m_CheckStatusPeriod(2),
377 m_ExclusiveJobSemaphore(1, 1),
378 m_IsProcessingExclusiveJob(
false),
379 m_TotalMemoryLimit(0),
385 m_SingleThreadForced(
false)
389 eJobFactoryIsNotSet,
"The JobFactory is not set.");
418 m_Impl->m_SuspendResume.GetLock()->Suspend(pullback, timeout);
431 m_Impl->m_SuspendResume->Resume();
456 string max_threads =
m_SynRegistry->Get(
"server",
"max_threads",
"8");
466 "] max_threads parameter to number.\n"
476 if (!memlimitstr.empty())
501 unsigned init_threads =
m_SynRegistry->Get(
"server",
"init_threads", 1);
506 catch (exception& ex) {
522 pair<TJobCounter::iterator, bool> insertion(
525 if (!insertion.second) {
529 ++insertion.first->second;
532 job_run_registration->
RegisterRun(
this, insertion.first);
541 string procinfo_file_name)
554 string procinfo_file_name)
563 " build " << build_info.
date <<
" tag " << build_info.
tag);
567 unsigned short start_port, end_port;
570 string control_port_arg(args[
"control_port"] ?
571 args[
"control_port"].AsString() :
580 start_port = NStr::StringToNumeric<unsigned short>(from_port);
581 end_port = to_port.
empty() ? start_port : NStr::StringToNumeric<unsigned short>(to_port);
585 bool is_daemon = daemonize !=
eDefault ? daemonize ==
eOn :
589 vector<string> vhosts;
593 ITERATE(vector<string>, it, vhosts) {
607 ITERATE(vector<string>, it, vhosts) {
629 "] \"wait_server_timeout\" is not used anymore.\n"
631 "] \"communication_timeout\" parameter instead.");
634 if (!procinfo_file_name.empty()) {
636 CFile proc_info_file(procinfo_file_name);
637 if (proc_info_file.
Exists()) {
640 fprintf(stderr,
"'%s' is not writable.\n",
641 procinfo_file_name.c_str());
648 string test_file_name = procinfo_file_name +
".TEST";
649 FILE*
f = fopen(test_file_name.c_str(),
"w");
651 perror((
"Cannot create " + test_file_name).c_str());
658 remove(test_file_name.c_str());
664 LOG_POST_X(53,
"Entering UNIX daemon mode...");
681 control_thread->Prepare();
685 ERR_POST(
"Couldn't start a listener on a port from the "
686 "specified control port range; last port tried: " <<
687 control_thread->GetControlPort() <<
". Another "
688 "process (probably another instance of this worker "
689 "node) is occupying the port(s).");
714 bool reliable_cleanup =
m_SynRegistry->Get(
"server",
"reliable_cleanup",
false);
716 if (reliable_cleanup) {
718 if (child_pid != 0) {
724 remove(procinfo_file_name.c_str());
731 " was terminated by signal " <<
signum);
736 LOG_POST(
"Worker node process " << child_pid <<
737 " terminated normally (exit code 0)");
740 " terminated with exit code " << retcode);
752 if (!procinfo_file_name.empty()) {
754 if ((procinfo_file = fopen(procinfo_file_name.c_str(),
"wt")) ==
NULL) {
755 perror(procinfo_file_name.c_str());
758 fprintf(procinfo_file,
"pid: %lu\nport: %s\n"
759 "client_node: %s\nclient_session: %s\n",
764 fclose(procinfo_file);
770 CDeadline max_wait_for_servers(TWorkerNode_MaxWaitForServers::GetDefault());
808 unsigned idle_run_delay =
m_SynRegistry->Get(
"server",
"idle_run_delay", 30);
809 unsigned auto_shutdown =
m_SynRegistry->Get(
"server",
"auto_shutdown_if_idle", 0);
811 if (idle_run_delay > 0)
813 if (task || auto_shutdown > 0) {
815 task ? idle_run_delay : auto_shutdown, auto_shutdown));
821 control_thread->Run();
825 " ===================\n" <<
827 build_info.
date <<
" tag " << build_info.
tag <<
830 ":" << control_thread->GetControlPort() <<
"\n"
841 main_loop_thread->Run();
842 main_loop_thread->Join();
847 bool force_exit =
m_SynRegistry->Get(
"server",
"force_exit",
false);
849 ERR_POST_X(45,
"Force exit (worker threads will not be waited for)");
873 if (!reliable_cleanup)
877 remove(procinfo_file_name.c_str());
882 LOG_POST(
Info <<
"Stopping the socket server thread...");
883 control_thread->Stop();
884 control_thread->Join();
905 catch (exception& ex) {
906 ERR_POST_X(33,
"Could not stop worker threads: " << ex.what());
920 ERR_POST_X(35,
"Could not unregister from NetSchedule services: "
924 catch (exception& ex) {
925 ERR_POST_X(36,
"Could not unregister from NetSchedule services: " <<
968 if (
m_Impl->m_AdminHosts.empty())
971 if (
m_Impl->m_AdminHosts.find(ha) !=
m_Impl->m_AdminHosts.end())
976 if (
m_Impl->m_AdminHosts.find(ha) !=
m_Impl->m_AdminHosts.end())
994 "GETLOAD" << endl << ends;
1004 ERR_POST_X(43,
"Worker Node at " << it->AsString() <<
1005 " returned error: " << msg);
1012 }
catch (exception&) {}
1014 ERR_POST_X(44,
"Worker Node at " << it->AsString() <<
1015 " returned unknown reply: " << reply);
1068 m_Impl->m_SingleThreadForced =
true;
1073 return *
m_Impl->m_JobProcessorFactory;
1078 return m_Impl->m_MaxThreads;
1083 return m_Impl->m_TotalMemoryLimit;
1088 return m_Impl->m_TotalTimeLimit;
1093 return m_Impl->m_StartupTime;
1098 return m_Impl->m_QueueTimeout;
1103 return m_Impl->m_CommitJobInterval;
1108 return m_Impl->m_CheckStatusPeriod;
1114 return m_Impl->m_JobProcessorFactory->GetAppName();
1123 const auto& job_factory(
m_Impl->m_JobProcessorFactory);
1125 const string job_version(job_factory->GetAppVersion());
1127 return make_pair(job_version.empty() ? version_info.
Print() : job_version, build_info);
1132 return m_Impl->m_NetCacheAPI;
1137 return m_Impl->m_NetScheduleAPI;
1142 return m_Impl->m_NSExecutor;
1147 return m_Impl->m_CleanupEventSource;
1152 return m_Impl->m_SuspendResume->IsSuspended();
1157 return m_Impl->GetQueueName();
1162 return m_Impl->GetClientName();
1167 return m_Impl->GetServiceName();
CAtomicCounter_WithAutoInit –.
CGridControlThread(SGridWorkerNodeImpl *worker_node, unsigned short start_port, unsigned short end_port)
CWorkerNodeControlServer m_Control
const string m_ThreadName
unsigned short GetControlPort() const
virtual void OnExit(void)
Override this to execute finalization code.
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
void SetWorker(SGridWorkerNodeImpl *worker)
CWNJobWatcher & GetJobWatcher()
void SetUDPPort(unsigned short udp_port)
void RequestShutdown(CNetScheduleAdmin::EShutdownLevel level)
Request node shutdown.
void SetReuseJobObject(bool value)
void InterruptUDPPortListening()
static CGridGlobals & GetInstance()
An adapter class for IGridWorkerNodeApp_Listener.
virtual void Notify(const CWorkerNodeJobContext &, EEvent event)
CIdleWatcher(CWorkerNodeIdleThread &idle)
CAtomicCounter_WithAutoInit m_RunningJobs
CWorkerNodeIdleThread & m_Idle
void RegisterRun(CRunningJobLimit *job_counter, CRunningJobLimit::TJobCounter::iterator job_group_it)
CNcbiOstrstreamToString class helps convert CNcbiOstrstream to a string Sample usage:
Client API for NetCache server.
Client API for NCBI NetSchedule server.
void GetQueueInfo(CNetServer server, const string &queue_name, TQueueInfo &queue_info)
@ eShutdownImmediate
Urgent shutdown was requested.
Smart pointer to a part of the NetSchedule API that does job retrieval and processing on the worker n...
unsigned short GetPort() const
Extended exit information for waited process.
void ResetJobCounter(unsigned max_number)
bool CountJob(const string &job_group, CJobRunRegistration *job_run_registration)
static unsigned int GetCpuCount(void)
Return number of active CPUs/cores (never less than 1).
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
void SetInfiniteLoopTime(unsigned int infinite_loop_time)
void SetMaxFailuresAllowed(unsigned int max_failures_allowed)
void Print(CNcbiOstream &os) const
void SetMaxJobsAllowed(unsigned int max_jobs_allowed)
unsigned short GetControlPort() const
Worker Node Idle Task Context.
unsigned int m_RunInterval
SGridWorkerNodeImpl * m_WorkerNode
CWorkerNodeIdleThread(IWorkerNodeIdleTask *, SGridWorkerNodeImpl *worker_node, unsigned run_delay, unsigned int auto_shutdown)
unsigned int m_AutoShutdown
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
unsigned x_GetIdleTimeIfShutdown() const
CStopWatch m_AutoShutdownSW
CWorkerNodeIdleTaskContext & GetContext()
bool IsShutdownRequested() const
bool x_GetStopFlag() const
CWorkerNodeIdleTaskContext m_TaskContext
CWorkerNodeIdleThread & operator=(const CWorkerNodeIdleThread &)
IWorkerNodeIdleTask * m_Task
virtual void OnExit(void)
Override this to execute finalization code.
volatile bool m_ShutdownFlag
unsigned int x_GetInterval() const
CWorkerNodeIdleThread(const CWorkerNodeIdleThread &)
const string m_ThreadName
virtual void Process()
Do the actual job Called by whichever thread handles this request.
CWorkerNodeJobContext m_JobContext
Listener of events generated by CGridWorkerNodeApp.
Clean-up event source for the worker node.
Worker Node Idle Task Interface.
Worker Node Job Factory interface.
const_iterator end() const
iterator_bool insert(const value_type &val)
container_type::value_type value_type
const_iterator find(const key_type &key) const
iterator_bool insert(const value_type &val)
static void DLIST_NAME() remove(DLIST_LIST_TYPE *list, DLIST_TYPE *item)
static CGridWorkerNode::EDisabledRequestEvents s_ReqEventsDisabled
bool g_IsRequestStopEventEnabled()
#define DEFAULT_NS_TIMEOUT
bool g_IsRequestStartEventEnabled()
const CNcbiEnvironment & GetEnvironment(void) const
Get the application's cached environment.
virtual const CArgs & GetArgs(void) const
Get parsed command line arguments.
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
const CVersionAPI & GetFullVersion(void) const
Get the program version information.
@ eTakeOwnership
An object can take ownership of another.
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
void SetDiagPostFlag(EDiagPostFlag flag)
Set the specified flag (globally).
#define LOG_POST_X(err_subcode, message)
void SetSessionID(const string &session)
static CRequestContext & GetRequestContext(void)
Shortcut to CDiagContextThreadData::GetThreadData().GetRequestContext()
static TPID GetPID(void)
Get cached PID (read real PID if not cached yet).
#define ERR_POST_X(err_subcode, message)
Error posting with default error code and given error subcode.
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
@ eDPF_MergeLines
Escape EOLs.
@ eDPF_PreMergeLines
Obsolete. Use eDPF_MergeLines.
void Critical(CExceptionArgs_Base &args)
void Error(CExceptionArgs_Base &args)
#define NCBI_CATCH_ALL_X(err_subcode, message)
TErrCode GetErrCode(void) const
Get error code.
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
void Warning(CExceptionArgs_Base &args)
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
void Info(CExceptionArgs_Base &args)
bool CheckAccess(TMode access_mode) const
Check access rights.
virtual bool Exists(void) const
Check existence of file.
@ fWrite
Write permission.
time_t GetStartupTime() const
CWorkerNodeIdleThread & m_Thread
IWorkerNodeJobFactory & GetJobFactory()
const string & GetQueueName() const
virtual ~IWorkerNodeJobWatcher()
void SetListener(IGridWorkerNodeApp_Listener *listener)
unsigned GetCommitJobInterval() const
bool IsHostInAdminHostsList(const string &host) const
bool IsShutdownRequested() const
TVersion GetAppVersion() const
const string & GetClientName() const
CNetCacheAPI GetNetCacheAPI() const
pair< string, SBuildInfo > TVersion
virtual void Run(CWorkerNodeIdleTaskContext &)=0
Do the Idle task here.
EDisabledRequestEvents
Disable the automatic logging of request-start and request-stop events by the framework itself.
int Run(ESwitch daemonize=eDefault, string procinfo_file_name=string())
Start job execution loop.
unsigned int GetMaxThreads() const
Get the maximum threads running simultaneously.
unsigned GetTotalTimeLimit() const
Get total time limit (automatic restart after that)
unsigned GetQueueTimeout() const
CNetScheduleExecutor GetNSExecutor() const
IWorkerNodeCleanupEventSource * GetCleanupEventSource()
const string & GetServiceName() const
const SServerParams & GetServerParams()
static void DisableDefaultRequestEventLogging(EDisabledRequestEvents disabled_events=eDisableStartStop)
string GetAppName() const
void SetProgramVersion(const string &pv)
Set program version (like: MyProgram v.
CNetScheduleExecutor GetExecutor()
Create an instance of CNetScheduleExecutor.
CWorkerNodeIdleTaskContext(CWorkerNodeIdleThread &thread)
CNetScheduleAPI GetNetScheduleAPI() const
CNetScheduleAdmin GetAdmin()
void Suspend(bool pullback, unsigned timeout)
Uint8 GetTotalMemoryLimit() const
Get total memory limit (automatic restart if node grows more than that)
unsigned GetCheckStatusPeriod() const
CNetRef< SGridWorkerNodeImpl > m_Impl
void SetClientNode(const string &client_node)
void Reset(void)
Reset reference object.
uint64_t Uint8
8-byte (64-bit) unsigned integer
const unsigned long kInfiniteTimeoutMs
Infinite timeout in milliseconds.
bool IsPresent(void) const
TRUE if the object contains information about the process state.
static TPid GetPid(void)
Get process identifier (pid) for the current process.
bool IsExited(void) const
TRUE if the process terminated normally.
int GetSignal(void) const
Get the signal number that has caused the process to terminate (UNIX only).
static TPid Daemonize(const char *logfile=0, TDaemonFlags flags=0)
Go daemon.
static TPid Fork(TForkFlags flags=fFF_UpdateDiag)
Fork the process.
int Wait(unsigned long timeout=kInfiniteTimeoutMs, CExitInfo *info=0) const
Wait until process terminates.
pid_t TPid
Process identifier (PID) and process handle.
bool IsSignaled(void) const
TRUE if the process terminated by a signal (UNIX only).
int GetExitCode(void) const
Get process exit code.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
EIO_Status ReadLine(string &str)
Read a line from socket (up to CR-LF, LF, or null character, discarding any of the EOLs).
static string gethostname(ESwitch log=eOff)
Return empty string on error.
static unsigned int gethostbyname(const string &host, ESwitch log=eOff)
Return 0 on error.
EIO_Status GetStatus(EIO_Event direction) const
Return status of *last* I/O operation without making any actual I/O.
EIO_Status Write(const void *buf, size_t size, size_t *n_written=0, EIO_WriteMethod how=eIO_WritePersist)
Write to socket.
static int CompareNocase(const CTempString s1, SIZE_TYPE pos, SIZE_TYPE n, const char *s2)
Case-insensitive compare of a substring with another string.
static int StringToInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to int.
static list< string > & Split(const CTempString str, const CTempString delim, list< string > &arr, TSplitFlags flags=0, vector< SIZE_TYPE > *token_pos=NULL)
Split a string using specified delimiters.
static Uint8 StringToUInt8_DataSize(const CTempString str, TStringToNumFlags flags=0)
Convert string that can contain "software" qualifiers to Uint8.
static SIZE_TYPE Find(const CTempString str, const CTempString pattern, ECase use_case=eCase, EDirection direction=eForwardSearch, SIZE_TYPE occurrence=0)
Find the pattern in the string.
bool empty(void) const
Return true if the represented string is empty (i.e., the length is zero)
static string & Replace(const string &src, const string &search, const string &replace, string &dst, SIZE_TYPE start_pos=0, SIZE_TYPE max_replace=0, SIZE_TYPE *num_replace=0)
Replace occurrences of a substring within a string.
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
static unsigned int StringToUInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to unsigned int.
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
static string TruncateSpaces(const string &str, ETrunc where=eTrunc_Both)
Truncate spaces in a string.
@ fSplit_MergeDelimiters
Merge adjacent delimiters.
virtual void KillAllThreads(TKillFlags flags)
Causes all threads in the pool to exit cleanly after finishing all pending requests,...
void Spawn(unsigned int num_threads)
Start processing threads.
void Run(void)
Enter the main loop.
unsigned int m_MaxThreads
Maximum simultaneous threads.
void StartListening(void)
Start listening before the main loop.
@ eCouldntListen
Unable to bind listening port.
bool Run(TRunMode flags=fRunDefault)
Run the thread.
virtual void OnExit(void)
Override this to execute finalization code.
static void SetCurrentThreadName(const CTempString &)
Set name for the current thread.
bool TryWait(unsigned int timeout_sec=0, unsigned int timeout_nsec=0)
Timed wait.
void Post(unsigned int count=1)
Increment the semaphore by "count".
void Join(void **exit_data=0)
Wait for the thread termination.
double Restart(void)
Return time elapsed since first Start() or last Restart() call (in seconds).
CNanoTimeout GetRemainingTime(void) const
Get time left to the expiration.
double Elapsed(void) const
Return time elapsed since first Start() or last Restart() call (in seconds).
bool IsExpired(void) const
Check if the deadline is expired.
void Stop(void)
Suspend the timer.
enum ENcbiSwitch ESwitch
Aux.
enum ENcbiOwnership EOwnership
Ownership relations between objects.
@ eIO_Success
everything is fine, no error occurred
@ eIO_Open
also serves as no-event indicator in SOCK_Poll
const SBuildInfo & GetBuildInfo() const
Get build info (date and tag, if set)
virtual string Print(void) const
Print version information.
unsigned int
A callback function used to compare two keys in a database.
void SleepMilliSec(unsigned long ml_sec, EInterruptOnSignal onsignal=eRestartOnSignal)
void SleepSec(unsigned long sec, EInterruptOnSignal onsignal=eRestartOnSignal)
Sleep.
const char *const kNetScheduleAPIDriverName
double f(double x_, const double &y_)
const CNSPreciseTime default_timeout(3600, 0)
static CNamedPipeClient * client
This class allows to add build info (date and tag) to application version.
unique_ptr< IGridWorkerNodeApp_Listener > m_Listener
CSynRegistry::TPtr m_SynRegistry
bool EnterExclusiveMode()
unsigned m_CheckStatusPeriod
string GetAppName() const
CNetScheduleAPI m_NetScheduleAPI
CRef< CJobCommitterThread > m_JobCommitterThread
unsigned m_TotalTimeLimit
void x_StartWorkerThreads()
CNcbiApplicationAPI & m_App
IWorkerNodeCleanupEventSource * GetCleanupEventSource() const override
Get interface for registering clean-up event listeners.
const CArgs & GetArgs() const override
Get command line arguments.
unsigned m_CommitJobInterval
CStdPoolOfThreads * m_ThreadPool
CRef< CWorkerNodeIdleThread > m_IdleThread
const string & GetServiceName() const
void x_NotifyJobWatchers(const CWorkerNodeJobContext &job_context, IWorkerNodeJobWatcher::EEvent event)
CSemaphore m_ExclusiveJobSemaphore
void x_StopWorkerThreads()
CRunningJobLimit m_JobsPerSessionID
bool m_ProgressLogRequested
CFastMutex m_JobWatcherMutex
unique_ptr< IWorkerNodeJobFactory > m_JobProcessorFactory
unsigned int m_MaxThreads
CRef< CWorkerNodeCleanup > m_CleanupEventSource
bool x_AreMastersBusy() const
set< unsigned int > m_AdminHosts
size_t m_QueueEmbeddedOutputSize
CNetScheduleAPI GetNetScheduleAPI() const override
Get the shared NetScheduleAPI object used by the worker node framework.
void LeaveExclusiveMode()
set< SSocketAddress > m_Masters
const string & GetQueueName() const
CRef< IRegistry > m_Registry
SGridWorkerNodeImpl(CNcbiApplicationAPI &app, IWorkerNodeJobFactory *job_factory)
int Run(ESwitch daemonize, string procinfo_file_name)
bool m_SingleThreadForced
CNetCacheAPI GetNetCacheAPI() const override
Get the shared NetCacheAPI object used by the worker node framework.
const string & GetClientName() const
void AddJobWatcher(IWorkerNodeJobWatcher &job_watcher, EOwnership owner=eNoOwnership)
bool m_IsProcessingExclusiveJob
CNetCacheAPI m_NetCacheAPI
SThreadSafe< SSuspendResume > m_SuspendResume
unsigned m_ThreadPoolTimeout
const CNcbiEnvironment & GetEnvironment() const override
Get environment variables.
bool WaitForExclusiveJobToFinish()
CNetScheduleExecutor m_NSExecutor
CRunningJobLimit m_JobsPerClientIP
const IRegistry & GetConfig() const override
Get a config file registry.
void SetAuthParam(const string ¶m_name, const string ¶m_value)
bool m_UseEmbeddedStorage
ESwitch retry_on_exception
CNetScheduleNotificationHandler m_NotificationHandler
static SSocketAddress Parse(const string &address, SHost::EName name=SHost::EName::eResolved)
EState CheckState() volatile
bool IsJobPullbackTimerExpired()
atomic< unsigned > m_CurrentJobGeneration
atomic< bool > m_IsSuspended
CDeadline m_JobPullbackTime
void SetJobPullbackTimer(unsigned seconds)
void Suspend(bool pullback, unsigned timeout)