57 #if __NC_TASKS_MONITOR
58 m_TaskName =
"CNCActiveHandler_Proxy";
159 m_ReservedForBG(
false),
160 m_ProcessingStarted(
false),
162 m_GotAnyAnswer(
false),
163 m_CmdFromClient(
false),
166 #if __NC_TASKS_MONITOR
167 m_TaskName =
"CNCActiveHandler";
197 CNCActiveHandler::State
200 SRV_FATAL(
"CNCActiveHandler started in invalid state");
229 CNCActiveHandler::State
268 CNCActiveHandler::State
279 if (
ctx == proxy_ctx)
430 const string& password,
453 if (!password.empty()) {
465 const string& password,
485 if (!password.empty()) {
497 const string& password,
526 if (!password.empty()) {
538 const string& password,
558 if (!password.empty()) {
570 const string& password,
606 if (!password.empty()) {
622 const string& password,
655 if (!password.empty()) {
705 const string& password,
736 if (!password.empty()) {
748 const string& password,
749 unsigned int add_time,
777 if (!password.empty()) {
794 m_CmdToSend += filters ?
"PROXY_BLIST2 \"" :
"PROXY_BLIST \"";
851 if (filters->
cr_srv != 0) {
1080 Uint8 remote_rec_no)
1100 CNCActiveHandler::State
1122 CNCActiveHandler::State
1129 m_ErrMsg =
"ERR:Connection closed by peer";
1136 CNCActiveHandler::State
1151 CNCActiveHandler::State
1195 m_ErrMsg =
"ERR:Error in TaskServer";
1225 CNCActiveHandler::State
1276 CNCActiveHandler::State
1286 CNCActiveHandler::State
1297 CNCActiveHandler::State
1302 <<
"Protocol error. Got response: '"
1311 Uint4 start_word = 0x01020304;
1316 CNCActiveHandler::State
1322 Uint4 finish_word = 0xFFFFFFFF;
1329 CNCActiveHandler::State
1336 CNCActiveHandler::State
1342 list<CTempString> params;
1344 if (params.size() < 5)
1347 list<CTempString>::const_iterator param_it = params.begin();
1369 CNCActiveHandler::State
1432 CNCActiveHandler::State
1438 pos +=
sizeof(
"NEED_ABORT") - 1;
1447 pos +=
sizeof(
"HAVE_NEWER") - 1;
1463 CNCActiveHandler::State
1542 CNCActiveHandler::State
1564 CNCActiveHandler::State
1572 CNCActiveHandler::State
1576 if (pos == string::npos) {
1579 <<
"SIZE is not found in peer response");
1583 pos +=
sizeof(
"SIZE=") - 1;
1592 <<
"Cannot parse data size: " << ex);
1611 CNCActiveHandler::State
1634 CNCActiveHandler::State
1649 const char* keywd =
"Content-Length:";
1658 pos += strlen(keywd);
1674 CNCActiveHandler::State
1689 m_ErrMsg =
"ERR:Error writing to peer";
1691 m_ErrMsg =
"ERR:Error writing blob data to client";
1697 CNCActiveHandler::State
1712 CNCActiveHandler::State
1732 CNCActiveHandler::State
1735 list<CTempString> tokens;
1737 if (tokens.size() != 2 && tokens.size() != 3)
1740 list<CTempString>::const_iterator it_tok = tokens.begin();
1741 Uint8 local_rec_no = 0, remote_rec_no = 0;
1746 if (it_tok != tokens.end())
1763 CNCActiveHandler::State
1777 if (line.
empty() || line ==
";") {
1791 CNCActiveHandler::State
1812 CNCActiveHandler::State
1836 data +=
sizeof(evt->
rec_no);
1855 CNCActiveHandler::State
1876 CNCActiveHandler::State
1888 +
sizeof(blob_sum->
expire)
1906 memcpy(&blob_sum->
expire, data,
sizeof(blob_sum->
expire));
1907 data +=
sizeof(blob_sum->
expire);
1919 CNCActiveHandler::State
1957 CNCActiveHandler::State
1977 CNCActiveHandler::State
1980 list<CTempString> tokens;
1982 if (tokens.size() != 11) {
1985 list<CTempString>::const_iterator it_tok = tokens.begin();
2018 CNCActiveHandler::State
2027 m_ErrMsg =
"ERR:Error writing blob to database";
2064 CNCActiveHandler::State
2081 list<CTempString> tokens;
2083 if (tokens.size() != 7)
2086 list<CTempString>::const_iterator it_tok = tokens.begin();
2113 CNCActiveHandler::State
2118 list<CTempString> tokens;
2120 ITERATE( list<CTempString>,
t, tokens) {
2124 list<CTempString> v;
2125 ncbi_NStr_Split(two,
".", v);
2126 if (v.size() >= 3) {
2127 for (
int i=0;
i<3; ++
i) {
2151 CNCActiveHandler::State
2167 bool need_event =
false;
2197 CNCActiveHandler::State
2265 CNCActiveHandler::State
2289 CNCActiveHandler::State
2296 Uint8(0xFFFFFFFE)));
2305 m_ErrMsg =
"ERR:Blob data is corrupted";
2324 CNCActiveHandler::State
2332 CNCActiveHandler::State
2346 CNCActiveHandler::State
static string s_PeerAuthString
Uint4 GetDefaultTaskPriority(void)
CNCActiveHandler * m_Handler
CNCMessageHandler * m_Client
CNCMessageHandler * GetClient(void)
CNCActiveClientHub(CNCMessageHandler *client)
virtual void ExecuteRCU(void)
Method implementing RCU job that was scheduled earlier by CallRCU().
void SetStatus(ENCClientHubStatus status)
static CNCActiveClientHub * Create(Uint8 srv_id, CNCMessageHandler *client)
ENCClientHubStatus m_Status
void SetErrMsg(const string &msg)
string GetFullPeerName(void)
void NeedToProxySocket(void)
CNCActiveHandler_Proxy(CNCActiveHandler *handler)
bool SocketProxyDone(void)
virtual void ExecuteSlice(TSrvThreadNum thr_num)
This is the main method to do all work this task should do.
virtual ~CNCActiveHandler_Proxy(void)
CNCActiveHandler * m_Handler
void SetHandler(CNCActiveHandler *handler)
State x_ProcessProtocolError(void)
State x_ReadSyncStartAnswer(void)
void x_CleanCmdResources(void)
State x_ReadCopyProlong(void)
bool IsReservedForBG(void)
State x_ReadBlobsListBody(void)
void x_StartWritingBlob(void)
void ProxyHasBlob(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, Uint1 quorum)
State x_SendCopyPutCmd(void)
void ProxySetValid(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, int version)
CNCBlobAccessor * m_BlobAccess
State x_ReplaceServerConn(void)
void SyncProlongPeer(CNCActiveSyncControl *ctrl, SNCSyncEvent *event)
void SyncRead(CNCActiveSyncControl *ctrl, SNCSyncEvent *event)
State x_ReadSyncProInfoAnswer(void)
State x_ProcessPeerError(void)
State x_ReadBlobData(void)
void SyncSend(CNCActiveSyncControl *ctrl, SNCSyncEvent *event)
State x_SendSyncGetCmd(void)
void CopyPurge(CRequestContext *cmd_ctx, const CNCBlobKeyLight &key, Uint8 when)
void ProxyWrite(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, int version, Uint4 ttl, Uint1 quorum, TNCUserFlags flags)
bool x_StartProcessing(void)
State x_ReadSyncStartExtra(void)
State x_FakeWritingBlob(void)
CNCActiveClientHub * m_Client
CNCActiveHandler_Proxy * m_Proxy
State x_ReadEventsListBody(void)
State x_SendCmdToExecute(void)
void CopyUpdate(const CNCBlobKeyLight &key, Uint8 create_time)
void ProxyBList(CRequestContext *cmd_ctx, const CNCBlobKey &key, bool force_local, SNCBlobFilter *filters)
void ProxyRead(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, int version, Uint8 start_pos, Uint8 size, Uint1 quorum, bool search, bool force_local, Uint8 age)
State x_ReadConfirm(void)
State x_ReadBlobsListKeySize(void)
State x_FinishWritingBlob(void)
void SetReservedForBG(bool value)
State x_ReadCopyPut(void)
void SetClientHub(CNCActiveClientHub *hub)
void x_SendCopyProlongCmd(const SNCBlobSummary &blob_sum)
CNCActiveHandler(Uint8 srv_id, CNCPeerControl *peer)
void SyncCancel(CNCActiveSyncControl *ctrl)
void CopyRemove(const CNCBlobKeyLight &key, Uint8 create_time)
State x_ReadSyncGetAnswer(void)
State x_FinishCommand(void)
State x_WaitForMetaInfo(void)
void SyncStart(CNCActiveSyncControl *ctrl, Uint8 local_rec_no, Uint8 remote_rec_no)
void ProxyRemove(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, int version, Uint1 quorum)
State x_PrepareSyncProlongCmd(void)
State x_ReadEventsListKeySize(void)
ESynActionType m_SyncAction
State x_ReadHttpDataPrefix(void)
State x_WriteBlobData(void)
void SyncCommit(CNCActiveSyncControl *ctrl, Uint8 local_rec_no, Uint8 remote_rec_no)
State x_ReadFoundMeta(void)
CNCActiveSyncControl * m_SyncCtrl
void ClientReleased(void)
State x_ConnClosedReplaceable(void)
void AskPeerVersion(void)
void ProxyGetMeta(CRequestContext *cmd_ctx, const CNCBlobKey &key, Uint1 quorum, bool force_local, int http)
static void Initialize(void)
void ProxyProlong(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, unsigned int add_time, Uint1 quorum, bool search, bool force_local)
void SetProxy(CNCActiveHandler_Proxy *proxy)
void x_SetStateAndStartProcessing(State state)
State x_ExecuteProInfoCmd(void)
void x_FinishSyncCmd(ESyncResult result, int hint)
void ProxyReadLast(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, Uint8 start_pos, Uint8 size, Uint1 quorum, bool search, bool force_local, Uint8 age)
State x_InvalidState(void)
void SyncBlobsList(CNCActiveSyncControl *ctrl)
void CloseForShutdown(void)
State x_CloseCmdAndConn(void)
State x_FinishBlobFromClient(void)
void SyncProlongOur(CNCActiveSyncControl *ctrl, SNCSyncEvent *event)
void ProxyGetSize(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, int version, Uint1 quorum, bool search, bool force_local)
State x_ReadDataPrefix(void)
State x_WaitOneLineAnswer(void)
void CheckCommandTimeout(void)
State x_ReadWritePrefix(void)
CNCBlobKeyLight m_BlobKey
State x_ReadPeerVersion(void)
void CopyProlong(const CNCBlobKeyLight &key, Uint2 slot, Uint8 orig_rec_no, Uint8 orig_time, const SNCBlobSummary &blob_sum)
Uint8 GetSrvId(void) const
State x_WaitClientRelease(void)
void CopyPut(CRequestContext *cmd_ctx, const CNCBlobKeyLight &key, Uint2 slot, Uint8 orig_rec_no)
State x_ReadSyncGetHeader(void)
void x_DoProlongOur(void)
State x_MayDeleteThis(void)
State x_ReadSizeToRead(void)
virtual ~CNCActiveHandler(void)
void x_SetSlotAndBucketAndVerifySlot(Uint2 slot)
void SearchMeta(CRequestContext *cmd_ctx, const CNCBlobKey &key)
State x_ReadDataForClient(void)
State x_ReadSyncStartHeader(void)
State x_PutSelfToPool(void)
bool AddStartEvent(SNCSyncEvent *evt)
bool AddStartBlob(const string &key, SNCBlobSummary *blob_sum)
void CmdFinished(ESyncResult res, ESynActionType action, CNCActiveHandler *conn, int hint)
void StartResponse(Uint8 local_rec_no, Uint8 remote_rec_no, bool by_blobs)
static void Register(EAlertType alert_type, const string &message)
void * GetWriteMemPtr(void)
Uint8 GetCurBlobSize(void) const
Get size of the blob.
void RequestMetaInfo(CSrvTask *owner)
void SetNewVerExpire(int dead_time)
void MoveReadPos(Uint4 move_size)
bool IsMetaInfoReady(void)
size_t GetWriteMemSize(void)
void SetCurBlobExpire(int expire, int dead_time=0)
unsigned int GetCurBlobTTL(void) const
void SetCurVerExpire(int dead_time)
unsigned int GetCurVersionTTL(void) const
void SetPassword(CTempString password)
bool IsCurBlobDead(void) const
void SetNewBlobExpire(int expire, int dead_time=0)
int GetCurBlobDeadTime(void) const
void SetVersionTTL(int ttl)
static bool UpdatePurgeData(const string &data, char separator='\n')
Uint4 GetCurCreateId(void) const
void SetBlobTTL(unsigned int ttl)
Set blob's timeout after last access before it will be deleted.
Uint8 GetCurBlobCreateTime(void) const
Uint8 GetCurCreateServer(void) const
void SetCreateServer(Uint8 create_server, Uint4 create_id)
void MoveWritePos(Uint4 move_size)
int GetCurBlobVersion(void) const
Uint4 GetReadMemSize(void)
int GetCurBlobExpire(void) const
string GetCurPassword(void) const
void SetPosition(Uint8 pos)
Initially set current position in the blob to start reading from.
int GetCurVerExpire(void) const
bool HasError(void) const
void SetBlobVersion(int ver)
void SetBlobCreateTime(Uint8 create_time)
const void * GetReadMemPtr(void)
bool IsBlobExists(void) const
Check if blob exists.
void Release(void)
Release blob lock.
const string & PackedKey(void) const
const CTempString & RawKey(void) const
const CTempString & SubKey(void) const
const CTempString & Cache(void) const
static void SavePurgeData(void)
static CNCBlobAccessor * GetBlobAccess(ENCAccessType access, const string &key, const string &password, Uint2 time_bucket)
Acquire access to the blob identified by key, subkey and version.
static Uint4 GetSyncPriority(void)
static string GetFullPeerName(Uint8 srv_id)
static Uint8 GetNetworkErrorTimeout(void)
static Uint1 GetBlobListTimeout(void)
static Uint8 GetSelfID(void)
static Uint1 GetPeerTimeout(void)
Handler of all NetCache incoming requests.
bool IsBlobWritingFinished(void)
void AbortInitialSync(void)
void RegisterConnError(void)
void RegisterConnSuccess(void)
void PutConnToPool(CNCActiveHandler *conn)
CNCActiveHandler * GetPooledConn(void)
bool AcceptsUserFlags(void) const
void ReleaseConn(CNCActiveHandler *conn)
bool CreateNewSocket(CNCActiveHandler *conn)
void SetHostProtocol(Uint8 ver)
void AssignClientConn(CNCActiveClientHub *hub)
static CNCPeerControl * Peer(Uint8 srv_id)
static void ClientDataRead(size_t data_size)
static void PeerDataWrite(size_t data_size)
static void PeerDataRead(size_t data_size)
static Uint8 AddEvent(Uint2 slot, SNCSyncEvent *event)
const value_type * data() const
void resize_mem(size_type new_size)
Resize the buffer. No data preservation.
void CallRCU(void)
Method to be called to schedule call of ExecuteRCU() at appropriate time.
Special variant of CRef that doesn't check for NULL when dereferencing.
bool IsProxyInProgress(void)
Check whether proxying started earlier is still in progress.
bool NeedToClose(void)
Checks if socket should be closed because of long inactivity or because server is in "hard" shutdown ...
size_t Write(const void *buf, size_t size)
Write into the socket as much as immediately possible (including writing into internal write buffers ...
void AbortSocket(void)
Abort the socket, i.e.
CSrvSocketTask & WriteText(CTempString message)
Write text into socket.
bool ProxyHadError(void)
Check whether proxying started earlier finished successfully or any of sockets had some error in it.
virtual void Terminate(void)
Terminate the task.
bool m_NeedToClose
Flag showing that socket needs to be closed because of long inactivity.
bool ReadData(void *buf, Uint2 size)
Read from socket exactly the given data size.
void RequestFlush(void)
Request flushing of all data saved in internal write buffers to socket.
size_t Read(void *buf, size_t size)
Read from socket into memory.
bool ReadNumber(NumType *num)
Read from socket a number in native machine representation.
void WriteData(const void *buf, size_t size)
Write the exact amount of data into the socket.
bool StartProcessing(TSrvThreadNum thread_num=0, bool boost=false)
Start processing of the socket and include it into TaskServer's central epoll.
bool ReadLine(CTempString *line)
Read from socket one line which ends with ' ', '\r ' or '\0'.
bool NeedEarlyClose(void)
Checks if socket should be closed because of internal reasons (long inactivity or "hard" shutdown as ...
void StartProxyTo(CSrvSocketTask *dst_task, Uint8 proxy_size)
Start proxying of raw data from this socket to the one in dst_task.
bool FlushIsDone(void)
Check if data flushing requested earlier is complete.
void SetState(State state)
Sets current state of the machine.
void SetDiagCtx(CRequestContext *ctx)
Set diagnostic context for this task to work in.
int m_LastActive
Time (in seconds) when the task was active last time, i.e.
CRequestContext * GetDiagCtx(void)
Get current diagnostic context for the task.
void SetPriority(Uint4 prty)
Set and retrieve task's priority.
void SetRunnable(bool boost=false)
Set this task "runnable", i.e.
virtual void Terminate(void)
Stops task's execution and deletes it.
void ReleaseDiagCtx(void)
Releases current diagnostic context of the task.
static int CurSecs(void)
Current time in seconds since epoch (time_t).
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
static void StartSyncBlob(Uint8 create_time)
void(*)(CSeq_entry_Handle seh, IWorkbench *wb, const CSerialObject &obj) handler
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
string GetSessionID(void) const
Session ID.
string GetClientIP(void) const
Client IP/hostname.
void SetRequestStatus(int status)
void Critical(CExceptionArgs_Base &args)
void Warning(CExceptionArgs_Base &args)
uint8_t Uint1
1-byte (8-bit) unsigned integer
uint32_t Uint4
4-byte (32-bit) unsigned integer
uint16_t Uint2
2-byte (16-bit) unsigned integer
int64_t Int8
8-byte (64-bit) signed integer
uint64_t Uint8
8-byte (64-bit) unsigned integer
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
NCBI_NS_STD::string::size_type SIZE_TYPE
static string Int8ToString(Int8 value, TNumToStringFlags flags=0, int base=10)
Convert Int8 to string.
static int StringToInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to int.
static string IntToString(int value, TNumToStringFlags flags=0, int base=10)
Convert int to string.
bool empty(void) const
Return true if the represented string is empty (i.e., the length is zero)
static SIZE_TYPE FindCase(const CTempString str, const CTempString pattern, SIZE_TYPE start, SIZE_TYPE end, EOccurrence which=eFirst)
Find the pattern in the specified range of a string using a case sensitive search.
static string UIntToString(unsigned int value, TNumToStringFlags flags=0, int base=10)
Convert UInt to string.
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
void clear(void)
Clears the string.
static Uint8 StringToUInt8(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to Uint8.
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
static unsigned int StringToUInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to unsigned int.
CTempString substr(size_type pos) const
Obtain a substring from this string, beginning at a given offset.
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
size_type find(const CTempString match, size_type pos=0) const
Find the first instance of the entire matching string within the current string, beginning at an opti...
size_type size(void) const
Return the length of the represented array.
static string UInt8ToString(Uint8 value, TNumToStringFlags flags=0, int base=10)
Convert UInt8 to string.
static const size_type npos
@ fAllowTrailingSymbols
Ignore trailing non-numerics characters.
@ fAllowLeadingSpaces
Ignore leading spaces in converted string.
const struct ncbi::grid::netcache::search::fields::SIZE size
const struct ncbi::grid::netcache::search::fields::KEY key
const string & GetMessageByStatus(EHTTPStatus sts)
static const char *const kNCPeerClientName
@ eStatus_OK
Command is ok and execution is good.
@ eStatus_BadPeer
Peer returned something wrong.
@ eNCReadData
Read blob data.
@ eNCCopyCreate
(Re-)write blob from another NetCache (as opposed to writing from client)
@ eNCRead
Read meta information only.
Defines CRequestContext class for NCBI C++ diagnostic API.
static CNamedPipeClient * client
#define SRV_LOG(sev, msg)
Macro to be used for printing log messages.
#define ACCESS_ONCE(x)
Purpose of this macro is to force compiler to access variable exactly at the place it's written (no m...
ENCSyncEvent
Event types to log.
Uint2 TSrvThreadNum
Type for thread number in TaskServer.