NCBI C++ ToolKit
active_handler.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: active_handler.cpp 83741 2018-09-14 12:35:21Z gouriano $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Pavel Ivanov
27  *
28  */
29 
30 #include "nc_pch.hpp"
31 
32 #include <corelib/request_ctx.hpp>
33 
34 #include "netcached.hpp"
35 #include "active_handler.hpp"
36 #include "peer_control.hpp"
37 #include "periodic_sync.hpp"
38 #include "nc_storage.hpp"
39 #include "nc_storage_blob.hpp"
40 #include "message_handler.hpp"
41 #include "nc_stat.hpp"
42 
43 
45 
46 
47 static string s_PeerAuthString;
48 
49 
50 
51 //static Uint8 s_CntProxys = 0;
52 
54  : m_Handler(handler),
55  m_NeedToProxy(false)
56 {
57 #if __NC_TASKS_MONITOR
58  m_TaskName = "CNCActiveHandler_Proxy";
59 #endif
60 
61  //Uint8 cnt = AtomicAdd(s_CntProxys, 1);
62  //INFO("CNCActiveHandler_Proxy, cnt=" << cnt);
63 }
64 
66 {
67  //Uint8 cnt = AtomicSub(s_CntProxys, 1);
68  //INFO("~CNCActiveHandler_Proxy, cnt=" << cnt);
69 }
70 
71 void
73 {
74  // Lots of checks for NULL follows because these pointers can be nullified
75  // concurrently at any moment.
77  if (handler) {
78  // All events coming to this proxy should be transfered to
79  // CNCActiveHandler except when we need to proxy data from this socket
80  // (from another NC server) to the client.
81  if (m_NeedToProxy) {
83  if (client)
84  StartProxyTo(client->GetClient(), handler->m_SizeToRead);
85  m_NeedToProxy = false;
86  if (IsProxyInProgress())
87  return;
88  }
89  handler->SetRunnable();
90  }
91  else
92  Terminate();
93 }
94 
95 void
97 {
98  m_NeedToProxy = true;
99  SetRunnable();
100 }
101 
102 inline bool
104 {
105  return !m_NeedToProxy && !IsProxyInProgress();
106 }
107 
108 
111 {
113  CNCPeerControl* peer = CNCPeerControl::Peer(srv_id);
114  peer->AssignClientConn(hub);
115  return hub;
116 }
117 
118 void
120 {
121  if (m_Handler)
123  CallRCU();
124 }
125 
126 string
128 {
129  if (m_Handler) {
131  }
132  return "";
133 }
134 
135 void
137 {
138  ACCESS_ONCE(m_Status) = status;
139  if (status != eNCHubCmdInProgress)
141 }
142 
143 void
145 {
146  delete this;
147 }
148 
149 
150 //static Uint8 s_CntHndls = 0;
151 
153  : m_SrvId(srv_id),
154  m_Peer(peer),
155  m_Client(NULL),
156  m_SyncCtrl(NULL),
157  m_CntCmds(0),
158  m_BlobAccess(NULL),
159  m_ReservedForBG(false),
160  m_ProcessingStarted(false),
161  m_CmdStarted(false),
162  m_GotAnyAnswer(false),
163  m_CmdFromClient(false),
164  m_Purge(false)
165 {
166 #if __NC_TASKS_MONITOR
167  m_TaskName = "CNCActiveHandler";
168 #endif
169  // Right after creation CNCActiveHandler shouldn't become runnable until
170  // it's assigned to CNCActiveHandlerHub or some command-executing method
171  // (like CopyPut(), SyncStart() etc.) is called.
173 
174  //Uint8 cnt = AtomicAdd(s_CntHndls, 1);
175  //INFO("CNCActiveHandler, cnt=" << cnt);
176  ResetSizeRdWr();
177 }
178 
180 {
181  while (GetDiagCtx()) {
182  ReleaseDiagCtx();
183  }
184  //Uint8 cnt = AtomicSub(s_CntHndls, 1);
185  //INFO("~CNCActiveHandler, cnt=" << cnt);
186 }
187 
188 void
190 {
192  s_PeerAuthString += " srv_id=";
194  s_PeerAuthString += "\n";
195 }
196 
197 CNCActiveHandler::State
199 {
200  SRV_FATAL("CNCActiveHandler started in invalid state");
201  return NULL;
202 }
203 
204 void
206 {
207  m_Client = hub;
208  m_CmdFromClient = true;
209  // Client can release this connection before giving any command and thus
210  // before changing state to anything different from x_InvalidState.
211  // To gracefully handle such situation we need to be in x_WaitClientRelease
212  // state.
214 }
215 
216 void
218 {
220 }
221 
222 void
224 {
225  m_Client = NULL;
226  SetRunnable();
227 }
228 
229 CNCActiveHandler::State
231 {
232  if (!m_Client)
233  Terminate();
234  return NULL;
235 }
236 
237 void
239 {
240  m_Proxy = proxy;
241  SetDiagCtx(proxy->GetDiagCtx());
242  m_ProcessingStarted = false;
243  m_GotAnyAnswer = false;
244  m_GotCmdAnswer = false;
245  m_GotClientResponse = false;
246  // SetProxy() will be called only when socket (CNCActiveHandler_Proxy) have
247  // been just created. So first line have to be client authentication. And
248  // although we call WriteText() right here this text won't go to the socket
249  // until we put first command in there and call Flush(). And we can't call
250  // Flush() here (even if we wanted to) because socket could be not writable
251  // yet.
252  proxy->WriteText(s_PeerAuthString);
253 }
254 
255 void
257 {
260 }
261 
262 bool
264 {
265  return m_ReservedForBG;
266 }
267 
268 CNCActiveHandler::State
270 {
271  // In the stack of diag contexts we always have socket's context first and
272  // command's context second. Here we change socket thus we need to change
273  // socket's context. To do that we need to "release" command's context
274  // first (if there is any -- some commands execute without command's
275  // context), then release (this time for real) socket's context, then put
276  // new socket's context, then put command's context (if there is any).
277  CRequestContext* proxy_ctx = m_Proxy->GetDiagCtx();
279  if (ctx == proxy_ctx)
280  ctx.Reset();
281  else
282  ReleaseDiagCtx();
283  ReleaseDiagCtx();
284 
286  m_Proxy->SetRunnable();
287  m_Proxy = NULL;
288 
289  CNCActiveHandler* src_handler = m_Peer->GetPooledConn();
290  if (src_handler) {
291  if (!src_handler->m_ProcessingStarted) {
292  SRV_LOG(Critical, "src_handler in invalid state");
293  }
294  m_ProcessingStarted = true;
295  m_GotAnyAnswer = src_handler->m_GotAnyAnswer;
296  m_Proxy = src_handler->m_Proxy;
297  m_Proxy->SetHandler(this);
299  if (ctx)
300  SetDiagCtx(ctx);
301  src_handler->ReleaseDiagCtx();
302  m_Peer->ReleaseConn(src_handler);
303  src_handler->Terminate();
305  }
306  if (m_Peer->CreateNewSocket(this)) {
307  if (ctx)
308  SetDiagCtx(ctx);
309  if (x_StartProcessing())
311  }
312 
313  if (ctx)
314  SetDiagCtx(ctx);
316 }
317 
318 void
320 {
321  SetDiagCtx(cmd_ctx);
323 
324  m_CmdToSend.resize(0);
325  m_CmdToSend += "PROXY_META \"";
326  m_CmdToSend += nc_key.Cache();
327  m_CmdToSend += "\" \"";
328  m_CmdToSend += nc_key.RawKey();
329  m_CmdToSend += "\" \"";
330  m_CmdToSend += nc_key.SubKey();
331  m_CmdToSend += "\" \"";
332  m_CmdToSend += cmd_ctx->GetClientIP();
333  m_CmdToSend += "\" \"";
334  m_CmdToSend += cmd_ctx->GetSessionID();
335  m_CmdToSend.append(1, '"');
336 
338 }
339 
341 {
343  m_CmdToSend.resize(0);
344  m_CmdToSend += "VERSION";
346 }
347 
348 void
350  const CNCBlobKeyLight& nc_key,
351  Uint8 when)
352 {
353  bool p2 = !nc_key.RawKey().empty();
354  if (cmd_ctx)
355  SetDiagCtx(cmd_ctx);
357  m_CmdToSend.resize(0);
358  m_CmdToSend += p2 ? "COPY_PURGE2" : "COPY_PURGE";
359  m_CmdToSend += " \"";
360  m_CmdToSend += nc_key.Cache();
361  m_CmdToSend += "\" ";
362  if (p2) {
363  m_CmdToSend += " \"";
364  m_CmdToSend += nc_key.RawKey();
365  m_CmdToSend += "\" ";
366  }
369 }
370 
371 void
373 {
375 
376  m_CmdToSend.resize(0);
377  m_CmdToSend += "COPY_UPD \"";
378  m_CmdToSend += nc_key.Cache();
379  m_CmdToSend += "\" \"";
380  m_CmdToSend += nc_key.RawKey();
381  m_CmdToSend += "\" \"";
382  m_CmdToSend += nc_key.SubKey();
383  m_CmdToSend += "\" ";
384  m_CmdToSend += NStr::UInt8ToString(create_time);
385  m_CmdToSend += " ";
386 // m_CmdToSend += NStr::UInt8ToString(blob_sum.create_server);
388 
390 }
391 
392 void CNCActiveHandler::CopyRemove(const CNCBlobKeyLight& nc_key, Uint8 create_time)
393 {
395 
396  m_CmdToSend.resize(0);
397  m_CmdToSend += "COPY_RMV \"";
398  m_CmdToSend += nc_key.Cache();
399  m_CmdToSend += "\" \"";
400  m_CmdToSend += nc_key.RawKey();
401  m_CmdToSend += "\" \"";
402  m_CmdToSend += nc_key.SubKey();
403  m_CmdToSend += "\" ";
404  m_CmdToSend += NStr::UInt8ToString(create_time);
405  m_CmdToSend += " ";
407 
409 }
410 
411 void
413  const CNCBlobKeyLight& key,
414  Uint2 slot,
415  Uint8 orig_rec_no)
416 {
417  if (cmd_ctx)
418  SetDiagCtx(cmd_ctx);
419  m_BlobKey = key;
421  m_OrigRecNo = orig_rec_no;
422  if (m_Client)
424  x_DoCopyPut();
425 }
426 
427 void
429  const CNCBlobKey& nc_key,
430  const string& password,
431  int version,
432  Uint1 quorum)
433 {
434  SetDiagCtx(cmd_ctx);
436 
437  m_CmdToSend.resize(0);
438  m_CmdToSend += "PROXY_RMV \"";
439  m_CmdToSend += nc_key.Cache();
440  m_CmdToSend += "\" \"";
441  m_CmdToSend += nc_key.RawKey();
442  m_CmdToSend += "\" \"";
443  m_CmdToSend += nc_key.SubKey();
444  m_CmdToSend += "\" ";
446  m_CmdToSend.append(1, ' ');
447  m_CmdToSend += NStr::UIntToString(quorum);
448  m_CmdToSend += " \"";
449  m_CmdToSend += cmd_ctx->GetClientIP();
450  m_CmdToSend += "\" \"";
451  m_CmdToSend += cmd_ctx->GetSessionID();
452  m_CmdToSend.append(1, '"');
453  if (!password.empty()) {
454  m_CmdToSend += " \"";
455  m_CmdToSend += password;
456  m_CmdToSend.append(1, '"');
457  }
458 
460 }
461 
462 void
464  const CNCBlobKey& nc_key,
465  const string& password,
466  Uint1 quorum)
467 {
468  SetDiagCtx(cmd_ctx);
470 
471  m_CmdToSend.resize(0);
472  m_CmdToSend += "PROXY_HASB \"";
473  m_CmdToSend += nc_key.Cache();
474  m_CmdToSend += "\" \"";
475  m_CmdToSend += nc_key.RawKey();
476  m_CmdToSend += "\" \"";
477  m_CmdToSend += nc_key.SubKey();
478  m_CmdToSend += "\" ";
479  m_CmdToSend += NStr::UIntToString(quorum);
480  m_CmdToSend += " \"";
481  m_CmdToSend += cmd_ctx->GetClientIP();
482  m_CmdToSend += "\" \"";
483  m_CmdToSend += cmd_ctx->GetSessionID();
484  m_CmdToSend.append(1, '"');
485  if (!password.empty()) {
486  m_CmdToSend += " \"";
487  m_CmdToSend += password;
488  m_CmdToSend.append(1, '"');
489  }
490 
492 }
493 
494 void
496  const CNCBlobKey& nc_key,
497  const string& password,
498  int version,
499  Uint1 quorum,
500  bool search,
501  bool force_local)
502 {
503  SetDiagCtx(cmd_ctx);
505 
506  m_CmdToSend.resize(0);
507  m_CmdToSend += "PROXY_GSIZ \"";
508  m_CmdToSend += nc_key.Cache();
509  m_CmdToSend += "\" \"";
510  m_CmdToSend += nc_key.RawKey();
511  m_CmdToSend += "\" \"";
512  m_CmdToSend += nc_key.SubKey();
513  m_CmdToSend += "\" ";
515  m_CmdToSend.append(1, ' ');
516  m_CmdToSend += NStr::UIntToString(quorum);
517  m_CmdToSend.append(1, ' ');
519  m_CmdToSend.append(1, ' ');
520  m_CmdToSend += NStr::UIntToString(Uint1(force_local));
521  m_CmdToSend += " \"";
522  m_CmdToSend += cmd_ctx->GetClientIP();
523  m_CmdToSend += "\" \"";
524  m_CmdToSend += cmd_ctx->GetSessionID();
525  m_CmdToSend.append(1, '"');
526  if (!password.empty()) {
527  m_CmdToSend += " \"";
528  m_CmdToSend += password;
529  m_CmdToSend.append(1, '"');
530  }
531 
533 }
534 
535 void
537  const CNCBlobKey& nc_key,
538  const string& password,
539  int version)
540 {
541  SetDiagCtx(cmd_ctx);
543 
544  m_CmdToSend.resize(0);
545  m_CmdToSend += "PROXY_SETVALID \"";
546  m_CmdToSend += nc_key.Cache();
547  m_CmdToSend += "\" \"";
548  m_CmdToSend += nc_key.RawKey();
549  m_CmdToSend += "\" \"";
550  m_CmdToSend += nc_key.SubKey();
551  m_CmdToSend += "\" ";
553  m_CmdToSend += " \"";
554  m_CmdToSend += cmd_ctx->GetClientIP();
555  m_CmdToSend += "\" \"";
556  m_CmdToSend += cmd_ctx->GetSessionID();
557  m_CmdToSend.append(1, '"');
558  if (!password.empty()) {
559  m_CmdToSend += " \"";
560  m_CmdToSend += password;
561  m_CmdToSend.append(1, '"');
562  }
563 
565 }
566 
567 void
569  const CNCBlobKey& nc_key,
570  const string& password,
571  int version,
572  Uint8 start_pos,
573  Uint8 size,
574  Uint1 quorum,
575  bool search,
576  bool force_local,
577  Uint8 age)
578 {
579  SetDiagCtx(cmd_ctx);
581 
582  m_CmdToSend.resize(0);
583  m_CmdToSend += "PROXY_GET \"";
584  m_CmdToSend += nc_key.Cache();
585  m_CmdToSend += "\" \"";
586  m_CmdToSend += nc_key.RawKey();
587  m_CmdToSend += "\" \"";
588  m_CmdToSend += nc_key.SubKey();
589  m_CmdToSend += "\" ";
591  m_CmdToSend.append(1, ' ');
592  m_CmdToSend += NStr::UInt8ToString(start_pos);
593  m_CmdToSend.append(1, ' ');
595  m_CmdToSend.append(1, ' ');
596  m_CmdToSend += NStr::UIntToString(quorum);
597  m_CmdToSend.append(1, ' ');
599  m_CmdToSend.append(1, ' ');
600  m_CmdToSend += NStr::UIntToString(Uint1(force_local));
601  m_CmdToSend += " \"";
602  m_CmdToSend += cmd_ctx->GetClientIP();
603  m_CmdToSend += "\" \"";
604  m_CmdToSend += cmd_ctx->GetSessionID();
605  m_CmdToSend.append(1, '"');
606  if (!password.empty()) {
607  m_CmdToSend += " \"";
608  m_CmdToSend += password;
609  m_CmdToSend.append(1, '"');
610  }
611  if (age != 0) {
612  m_CmdToSend += " age=";
614  }
615 
617 }
618 
619 void
621  const CNCBlobKey& nc_key,
622  const string& password,
623  Uint8 start_pos,
624  Uint8 size,
625  Uint1 quorum,
626  bool search,
627  bool force_local,
628  Uint8 age)
629 {
630  SetDiagCtx(cmd_ctx);
632 
633  m_CmdToSend.resize(0);
634  m_CmdToSend += "PROXY_READLAST \"";
635  m_CmdToSend += nc_key.Cache();
636  m_CmdToSend += "\" \"";
637  m_CmdToSend += nc_key.RawKey();
638  m_CmdToSend += "\" \"";
639  m_CmdToSend += nc_key.SubKey();
640  m_CmdToSend += "\" ";
641  m_CmdToSend += NStr::UInt8ToString(start_pos);
642  m_CmdToSend.append(1, ' ');
644  m_CmdToSend.append(1, ' ');
645  m_CmdToSend += NStr::UIntToString(quorum);
646  m_CmdToSend.append(1, ' ');
648  m_CmdToSend.append(1, ' ');
649  m_CmdToSend += NStr::UIntToString(Uint1(force_local));
650  m_CmdToSend += " \"";
651  m_CmdToSend += cmd_ctx->GetClientIP();
652  m_CmdToSend += "\" \"";
653  m_CmdToSend += cmd_ctx->GetSessionID();
654  m_CmdToSend.append(1, '"');
655  if (!password.empty()) {
656  m_CmdToSend += " \"";
657  m_CmdToSend += password;
658  m_CmdToSend.append(1, '"');
659  }
660  if (age != 0) {
661  m_CmdToSend += " age=";
663  }
664 
666 }
667 
668 void
670  const CNCBlobKey& nc_key,
671  Uint1 quorum,
672  bool force_local,
673  int http)
674 {
675  SetDiagCtx(cmd_ctx);
677 
678  m_CmdToSend.resize(0);
679  m_CmdToSend += "PROXY_GETMETA \"";
680  m_CmdToSend += nc_key.Cache();
681  m_CmdToSend += "\" \"";
682  m_CmdToSend += nc_key.RawKey();
683  m_CmdToSend += "\" \"";
684  m_CmdToSend += nc_key.SubKey();
685  m_CmdToSend += "\" ";
686  m_CmdToSend += NStr::UIntToString(quorum);
687  m_CmdToSend.append(1, ' ');
688  m_CmdToSend += NStr::UIntToString(Uint1(force_local));
689  m_CmdToSend += " \"";
690  m_CmdToSend += cmd_ctx->GetClientIP();
691  m_CmdToSend += "\" \"";
692  m_CmdToSend += cmd_ctx->GetSessionID();
693  m_CmdToSend.append(1, '"');
694  if (http != 0) {
695  m_CmdToSend += " http=";
697  }
698 
700 }
701 
702 void
704  const CNCBlobKey& nc_key,
705  const string& password,
706  int version,
707  Uint4 ttl,
708  Uint1 quorum,
710 {
711  SetDiagCtx(cmd_ctx);
713 
714  m_CmdToSend.resize(0);
715  m_CmdToSend += "PROXY_PUT \"";
716  m_CmdToSend += nc_key.Cache();
717  m_CmdToSend += "\" \"";
718  m_CmdToSend += nc_key.RawKey();
719  m_CmdToSend += "\" \"";
720  m_CmdToSend += nc_key.SubKey();
721  m_CmdToSend += "\" ";
723  m_CmdToSend.append(1, ' ');
725  m_CmdToSend.append(1, ' ');
726  m_CmdToSend += NStr::UIntToString(quorum);
727  m_CmdToSend += " \"";
728  m_CmdToSend += cmd_ctx->GetClientIP();
729  m_CmdToSend += "\" \"";
730  m_CmdToSend += cmd_ctx->GetSessionID();
731  m_CmdToSend.append(1, '"');
732  if (m_Peer->AcceptsUserFlags()) {
733  m_CmdToSend += " flags=";
735  }
736  if (!password.empty()) {
737  m_CmdToSend += " \"";
738  m_CmdToSend += password;
739  m_CmdToSend.append(1, '"');
740  }
741 
743 }
744 
745 void
747  const CNCBlobKey& nc_key,
748  const string& password,
749  unsigned int add_time,
750  Uint1 quorum,
751  bool search,
752  bool force_local)
753 {
754  SetDiagCtx(cmd_ctx);
756 
757  m_CmdToSend.resize(0);
758  m_CmdToSend += "PROXY_PROLONG \"";
759  m_CmdToSend += nc_key.Cache();
760  m_CmdToSend += "\" \"";
761  m_CmdToSend += nc_key.RawKey();
762  m_CmdToSend += "\" \"";
763  m_CmdToSend += nc_key.SubKey();
764  m_CmdToSend += "\" ";
765  m_CmdToSend += NStr::IntToString(add_time);
766  m_CmdToSend.append(1, ' ');
767  m_CmdToSend += NStr::UIntToString(quorum);
768  m_CmdToSend.append(1, ' ');
770  m_CmdToSend.append(1, ' ');
771  m_CmdToSend += NStr::UIntToString(Uint1(force_local));
772  m_CmdToSend += " \"";
773  m_CmdToSend += cmd_ctx->GetClientIP();
774  m_CmdToSend += "\" \"";
775  m_CmdToSend += cmd_ctx->GetSessionID();
776  m_CmdToSend.append(1, '"');
777  if (!password.empty()) {
778  m_CmdToSend += " \"";
779  m_CmdToSend += password;
780  m_CmdToSend.append(1, '"');
781  }
782 
784 }
785 
787  CRequestContext* cmd_ctx, const CNCBlobKey& nc_key, bool force_local,
788  SNCBlobFilter* filters)
789 {
790  SetDiagCtx(cmd_ctx);
792 
793  m_CmdToSend.resize(0);
794  m_CmdToSend += filters ? "PROXY_BLIST2 \"" : "PROXY_BLIST \"";
795  m_CmdToSend += nc_key.Cache();
796  m_CmdToSend += "\" \"";
797  m_CmdToSend += nc_key.RawKey();
798  m_CmdToSend += "\" \"";
799  m_CmdToSend += nc_key.SubKey();
800  m_CmdToSend += "\" ";
801  m_CmdToSend += NStr::UIntToString(Uint1(force_local));
802  if (filters) {
803  if (filters->cr_ago_ge != 0) {
804  m_CmdToSend += " fcr_ago_ge=";
806  }
807  if (filters->cr_ago_lt != 0) {
808  m_CmdToSend += " fcr_ago_lt=";
810  }
811  if (filters->cr_epoch_ge != 0) {
812  m_CmdToSend += " fcr_epoch_ge=";
814  }
815  if (filters->cr_epoch_lt != 0) {
816  m_CmdToSend += " fcr_epoch_lt=";
818  }
819  if (filters->exp_now_ge != 0) {
820  m_CmdToSend += " fexp_now_ge=";
822  }
823  if (filters->exp_now_lt != 0) {
824  m_CmdToSend += " fexp_now_lt=";
826  }
827  if (filters->exp_epoch_ge != 0) {
828  m_CmdToSend += " fexp_epoch_ge=";
830  }
831  if (filters->exp_epoch_lt != 0) {
832  m_CmdToSend += " fexp_epoch_lt=";
834  }
835  if (filters->vexp_now_ge != 0) {
836  m_CmdToSend += " fvexp_now_ge=";
838  }
839  if (filters->vexp_now_lt != 0) {
840  m_CmdToSend += " fvexp_now_lt=";
842  }
843  if (filters->vexp_epoch_ge != 0) {
844  m_CmdToSend += " fvexp_epoch_ge=";
846  }
847  if (filters->vexp_epoch_lt != 0) {
848  m_CmdToSend += " fvexp_epoch_lt=";
850  }
851  if (filters->cr_srv != 0) {
852  m_CmdToSend += " fcr_srv=";
854  }
855  if (filters->size_ge != 0) {
856  m_CmdToSend += " fsize_ge=";
858  }
859  if (filters->size_lt != 0) {
860  m_CmdToSend += " fsize_lt=";
862  }
863  }
864 
866 }
867 
868 void
870  Uint2 slot,
871  Uint8 orig_rec_no,
872  Uint8 orig_time,
873  const SNCBlobSummary& blob_sum)
874 {
875  m_BlobSum = blob_sum;
876  m_BlobKey = key;
878  m_OrigTime = orig_time;
879  m_OrigRecNo = orig_rec_no;
881  x_SendCopyProlongCmd(blob_sum);
882 }
883 
884 void
886  Uint8 local_rec_no,
887  Uint8 remote_rec_no)
888 {
890  m_SyncCtrl = ctrl;
891  SetDiagCtx(ctrl->GetDiagCtx());
893 
894  m_CmdToSend.resize(0);
895  m_CmdToSend += "SYNC_START ";
897  m_CmdToSend.append(1, ' ');
899  m_CmdToSend.append(1, ' ');
900  m_CmdToSend += NStr::UInt8ToString(local_rec_no);
901  m_CmdToSend.append(1, ' ');
902  m_CmdToSend += NStr::UInt8ToString(remote_rec_no);
903 
905 }
906 
907 void
909 {
911  m_SyncCtrl = ctrl;
912  SetDiagCtx(ctrl->GetDiagCtx());
914 
915  m_CmdToSend.resize(0);
916  m_CmdToSend += "SYNC_BLIST ";
918  m_CmdToSend.append(1, ' ');
920 
922 }
923 
924 void
926 {
927  m_BlobSum.size = event->blob_size;
929  m_SyncCtrl = ctrl;
930  SetDiagCtx(ctrl->GetDiagCtx());
931  m_BlobKey = event->key;
933  m_OrigRecNo = event->orig_rec_no;
934  x_DoCopyPut();
935 }
936 
937 void
939 {
941  m_SyncCtrl = ctrl;
942  SetDiagCtx(ctrl->GetDiagCtx());
943  m_BlobKey = key;
945  m_OrigRecNo = 0;
946  x_DoCopyPut();
947 }
948 
949 void
951 {
952  m_BlobSum.size = event->blob_size;
953  m_SyncCtrl = ctrl;
954  m_BlobKey = event->key;
955  m_OrigTime = event->orig_time;
956  m_OrigRecNo = event->orig_rec_no;
957  m_OrigServer = event->orig_server;
958  x_DoSyncGet();
959 }
960 
961 void
963  const CNCBlobKeyLight& key,
964  Uint8 create_time)
965 {
966  m_SyncCtrl = ctrl;
967  m_BlobKey = key;
968  m_OrigTime = create_time;
969  m_OrigRecNo = 0;
970  x_DoSyncGet();
971 }
972 
973 void
975  SNCSyncEvent* event)
976 {
977  m_BlobSum.size = event->blob_size;
978  m_SyncCtrl = ctrl;
979  SetDiagCtx(ctrl->GetDiagCtx());
981  m_BlobKey = event->key;
982  m_OrigTime = event->orig_time;
983  m_OrigRecNo = event->orig_rec_no;
984  m_OrigServer = event->orig_server;
991 }
992 
993 void
995  const CNCBlobKeyLight& key,
996  const SNCBlobSummary& blob_sum)
997 {
998  m_BlobSum = blob_sum;
999  m_SyncCtrl = ctrl;
1000  SetDiagCtx(ctrl->GetDiagCtx());
1002  m_BlobKey = key;
1004  m_OrigTime = 0;
1005  m_OrigRecNo = 0;
1006  m_OrigServer = 0;
1007  x_SendCopyProlongCmd(blob_sum);
1008 }
1009 
1010 void
1012  SNCSyncEvent* event)
1013 {
1014  m_BlobSum.size = event->blob_size;
1015  m_SyncCtrl = ctrl;
1016  SetDiagCtx(ctrl->GetDiagCtx());
1018  m_BlobKey = event->key;
1020  m_OrigTime = event->orig_time;
1021  m_OrigRecNo = event->orig_rec_no;
1022  m_OrigServer = event->orig_server;
1024 
1025  m_CmdToSend.resize(0);
1026  m_CmdToSend += "SYNC_PROINFO ";
1028  m_CmdToSend.append(1, ' ');
1030  m_CmdToSend += " \"";
1032  m_CmdToSend += "\" \"";
1034  m_CmdToSend += "\" \"";
1036  m_CmdToSend += "\"";
1037 
1039 }
1040 
1041 void
1043  const string& key,
1044  const SNCBlobSummary& blob_sum)
1045 {
1046  m_BlobSum = blob_sum;
1047  m_SyncCtrl = ctrl;
1048  SetDiagCtx(ctrl->GetDiagCtx());
1050  m_BlobKey = key;
1052  m_OrigTime = 0;
1053  m_OrigRecNo = 0;
1054  m_OrigServer = 0;
1056 
1057  x_DoProlongOur();
1058 }
1059 
1060 void
1062 {
1063  m_SyncCtrl = ctrl;
1064  SetDiagCtx(ctrl->GetDiagCtx());
1067 
1068  m_CmdToSend.resize(0);
1069  m_CmdToSend += "SYNC_CANCEL ";
1071  m_CmdToSend.append(1, ' ');
1073 
1075 }
1076 
1077 void
1079  Uint8 local_rec_no,
1080  Uint8 remote_rec_no)
1081 {
1082  m_SyncCtrl = ctrl;
1083  SetDiagCtx(ctrl->GetDiagCtx());
1086 
1087  m_CmdToSend.resize(0);
1088  m_CmdToSend += "SYNC_COMMIT ";
1090  m_CmdToSend.append(1, ' ');
1092  m_CmdToSend.append(1, ' ');
1093  m_CmdToSend += NStr::UInt8ToString(local_rec_no);
1094  m_CmdToSend.append(1, ' ');
1095  m_CmdToSend += NStr::UInt8ToString(remote_rec_no);
1096 
1098 }
1099 
1100 CNCActiveHandler::State
1102 {
1103  if (m_Proxy->NeedToClose())
1105 #if 0
1106  if (m_GotAnyAnswer) {
1107  // In this case we already talked to server for some time and then got
1108  // a disconnect. With old NC server this could mean that connection is
1109  // closed because of inactivity timeout, in this NC server this can
1110  // mean only some network glitch. i.e. normally it shouldn't happen.
1112  }
1113 #endif
1114 
1115  // Here we will be if we attempted to connect to another NC server and got
1116  // some error as a result. This is a serious error, we cannot do anything
1117  // about that.
1120 }
1121 
1122 CNCActiveHandler::State
1124 {
1125  if (m_ErrMsg.empty()) {
1126  if (m_Proxy && m_Proxy->NeedToClose())
1127  m_ErrMsg = "ERR:Command aborted";
1128  else
1129  m_ErrMsg = "ERR:Connection closed by peer";
1130  }
1131  m_CmdSuccess = false;
1134 }
1135 
1136 CNCActiveHandler::State
1138 {
1139  m_Peer->ReleaseConn(this);
1140  if (m_Proxy) {
1141  // m_Proxy->CloseSocket() cannot be called here as it can interfere
1142  // with parallel execution of flushing
1144  m_Proxy->SetRunnable();
1145  m_Proxy = NULL;
1146  ReleaseDiagCtx();
1147  }
1149 }
1150 
1151 CNCActiveHandler::State
1153 {
1155  if (!hub)
1157  if (!hub->GetClient()->IsBlobWritingFinished())
1158  return NULL;
1159  if (m_Proxy->NeedEarlyClose())
1161 
1163 }
1164 
1165 void
1167 {
1168  m_CurCmd = eCopyPut;
1173 }
1174 
1175 void
1177 {
1181  m_CurCmd = eSyncGet;
1186 }
1187 
1188 bool
1190 {
1191  m_ProcessingStarted = true;
1192  if (!m_Proxy->StartProcessing()) {
1193  m_ProcessingStarted = false;
1194  m_Proxy->AbortSocket();
1195  m_ErrMsg = "ERR:Error in TaskServer";
1197  }
1198  return m_ProcessingStarted;
1199 }
1200 
1201 void
1203 {
1204  if (m_Client)
1206  // Theoretically there's a race here and m_ProcessingStarted should be checked
1207  // before setting new state. But if processing was already started and
1208  // m_ProcessingStarted was already changed to TRUE it will never be changed
1209  // back to FALSE again. And even if as a result of changing state this object
1210  // will become runnable on other thread, will proceed with execution and
1211  // will terminate itself, even in this case object won't be physically deleted
1212  // yet and reading of m_ProcessingStarted will still be valid. And calling
1213  // SetRunnable() in this case won't do any harm. OTOH if processing for this
1214  // object wasn't started yet and m_ProcessingStarted is FALSE then this
1215  // object's socket wasn't added to main TaskServer's epoll yet. And in this
1216  // case it's not possible for it to become runnable on the other thread.
1217  SetState(state);
1218 // if StartProcessing succeeds, then this task will wait for signal from m_Proxy
1219 // otherwise, schedule this task for execution - to do x_CloseCmdAndConn
1221  SetRunnable();
1222  }
1223 }
1224 
1225 CNCActiveHandler::State
1227 {
1228  if (m_Proxy->NeedToClose())
1230  if (m_Proxy->NeedEarlyClose())
1232 
1234  m_Proxy->RequestFlush();
1235  m_CmdSuccess = true;
1236  m_CmdStarted = true;
1238 }
1239 
1240 inline void
1242 {
1243  if (m_SyncCtrl) {
1244  m_SyncCtrl->CmdFinished(result, m_SyncAction, this, hint);
1245  m_SyncCtrl = NULL;
1246  }
1247 }
1248 
1249 void
1251 {
1252  if (m_BlobAccess) {
1253  m_BlobAccess->Release();
1254  m_BlobAccess = NULL;
1255  }
1257  if (hub) {
1258  hub->SetErrMsg(m_ErrMsg);
1259  // SetStatus MUST be called after SetErrMsg because after SetStatus
1260  // CNCMessageHandler can immediately check error message without giving
1261  // us a chance to set it.
1263  }
1265  m_ErrMsg.clear();
1266  m_Response.clear();
1267  m_CmdStarted = false;
1268  m_BlobSum.reset();
1269 
1270  // Releasing diag context only if we have command-related one. If we only
1271  // have socket-related context we leave it intact.
1272  if (GetDiagCtx() && (!m_Proxy || GetDiagCtx() != m_Proxy->GetDiagCtx()))
1273  ReleaseDiagCtx();
1274 }
1275 
1276 CNCActiveHandler::State
1278 {
1280  if (m_Client)
1282 
1284 }
1285 
1286 CNCActiveHandler::State
1288 {
1289  m_ErrMsg = m_Response;
1290  SRV_LOG(Warning, "PeerError: " << m_ErrMsg);
1291  m_CmdSuccess = true;
1295 }
1296 
1297 CNCActiveHandler::State
1299 {
1300  SRV_LOG(Critical, "Error from peer "
1302  << "Protocol error. Got response: '"
1303  << m_Response << "'");
1306 }
1307 
1308 void
1310 {
1311  Uint4 start_word = 0x01020304;
1312  m_Proxy->WriteData(&start_word, sizeof(start_word));
1313  m_ChunkSize = 0;
1314 }
1315 
1316 CNCActiveHandler::State
1318 {
1321 
1322  Uint4 finish_word = 0xFFFFFFFF;
1323  m_Proxy->WriteData(&finish_word, sizeof(finish_word));
1324  m_Proxy->RequestFlush();
1327 }
1328 
1329 CNCActiveHandler::State
1331 {
1334 }
1335 
1336 CNCActiveHandler::State
1338 {
1339  if (!m_BlobExists)
1341 
1342  list<CTempString> params;
1343  ncbi_NStr_Split(m_Response, " ", params);
1344  if (params.size() < 5)
1346 
1347  list<CTempString>::const_iterator param_it = params.begin();
1348  ++param_it;
1349  try {
1351  ++param_it;
1353  ++param_it;
1354  m_BlobSum.create_id = NStr::StringToUInt(*param_it);
1355  ++param_it;
1356  m_BlobSum.dead_time = NStr::StringToInt(*param_it);
1357  ++param_it;
1358  m_BlobSum.expire = NStr::StringToInt(*param_it);
1359  ++param_it;
1360  m_BlobSum.ver_expire = NStr::StringToInt(*param_it);
1361  m_BlobExists = true;
1363  }
1364  catch (CStringException&) {
1366  }
1367 }
1368 
1369 CNCActiveHandler::State
1371 {
1372  if (m_BlobAccess->HasError()) {
1373  m_CmdSuccess = false;
1375  }
1379  }
1382 
1383  m_CmdToSend.resize(0);
1384  if (m_SyncCtrl) {
1385  m_CmdToSend += "SYNC_PUT ";
1387  m_CmdToSend.append(1, ' ');
1389  }
1390  else {
1391  m_CmdToSend += "COPY_PUT";
1392  }
1393  m_CmdToSend += " \"";
1395  m_CmdToSend += "\" \"";
1397  m_CmdToSend += "\" \"";
1399  m_CmdToSend += "\" ";
1401  m_CmdToSend += " \"";
1403  m_CmdToSend += "\" ";
1405  m_CmdToSend.append(1, ' ');
1407  m_CmdToSend.append(1, ' ');
1409  m_CmdToSend.append(1, ' ');
1411  m_CmdToSend.append(1, ' ');
1413  m_CmdToSend.append(1, ' ');
1415  m_CmdToSend.append(1, ' ');
1417  m_CmdToSend.append(1, ' ');
1419  m_CmdToSend.append(1, ' ');
1421  m_CmdToSend.append(1, ' ');
1423  m_CmdToSend.append(" 1 ");
1424  m_CmdToSend += " \"";
1426  m_CmdToSend += "\" \"";
1428  m_CmdToSend.append(1, '"');
1430 }
1431 
1432 CNCActiveHandler::State
1434 {
1435  SIZE_TYPE pos = NStr::FindCase(m_Response, "NEED_ABORT");
1436  if (pos != NPOS) {
1438  pos += sizeof("NEED_ABORT") - 1;
1439  if (m_Response.size() > pos && m_Response[pos] == '1')
1441  else
1443  }
1444  pos = NStr::FindCase(m_Response, "HAVE_NEWER");
1445  if (pos != NPOS) {
1447  pos += sizeof("HAVE_NEWER") - 1;
1448  if (m_Response.size() > pos && m_Response[pos] == '1')
1450  else
1452  }
1453 
1456 
1461 }
1462 
1463 CNCActiveHandler::State
1465 {
1466  if (m_BlobAccess->HasError()) {
1467  m_CmdSuccess = false;
1469  }
1473  }
1474  if (m_Proxy->NeedEarlyClose())
1476 
1477  SNCBlobSummary blob_sum;
1478  blob_sum.size = m_BlobAccess->GetCurBlobSize();
1481  blob_sum.create_id = m_BlobAccess->GetCurCreateId();
1483  blob_sum.expire = m_BlobAccess->GetCurBlobExpire();
1484  blob_sum.ver_expire = m_BlobAccess->GetCurVerExpire();
1485 
1486  // m_BlobAccess is not needed anymore. It will be re-created later if
1487  // blob won't be found on peer and it will be necessary to execute
1488  // SYNC_PUT on this blob
1489  m_BlobAccess->Release();
1490  m_BlobAccess = NULL;
1491 
1492  x_SendCopyProlongCmd(blob_sum);
1493  return NULL;
1494 }
1495 
1496 void
1498 {
1500  m_BlobSum = blob_sum;
1501 
1502  m_CmdToSend.resize(0);
1503  if (m_SyncCtrl) {
1504  m_CmdToSend += "SYNC_PROLONG ";
1506  m_CmdToSend.append(1, ' ');
1508  }
1509  else {
1510  m_CmdToSend += "COPY_PROLONG";
1511  }
1512  m_CmdToSend += " \"";
1514  m_CmdToSend += "\" \"";
1516  m_CmdToSend += "\" \"";
1518  m_CmdToSend += "\" ";
1520  m_CmdToSend.append(1, ' ');
1522  m_CmdToSend.append(1, ' ');
1524  m_CmdToSend.append(1, ' ');
1526  m_CmdToSend.append(1, ' ');
1527  m_CmdToSend += NStr::IntToString(blob_sum.expire);
1528  m_CmdToSend.append(1, ' ');
1530  if (m_OrigRecNo != 0) {
1531  m_CmdToSend.append(1, ' ');
1533  m_CmdToSend.append(1, ' ');
1535  m_CmdToSend.append(1, ' ');
1537  }
1538 
1540 }
1541 
1542 CNCActiveHandler::State
1544 {
1545  if (!m_BlobExists) {
1546  if (m_Proxy->NeedEarlyClose())
1548  // Extract command-related context and pass it to CopyPut where it will
1549  // be set as current again.
1551  ReleaseDiagCtx();
1553  return NULL;
1554  }
1555 
1556  if (NStr::FindCase(m_Response, "NEED_ABORT") != NPOS)
1558  else
1560 
1562 }
1563 
1564 CNCActiveHandler::State
1566 {
1567  m_ErrMsg = m_Response;
1570 }
1571 
1572 CNCActiveHandler::State
1574 {
1575  size_t pos = m_Response.find("SIZE=");
1576  if (pos == string::npos) {
1577  SRV_LOG(Critical, "Error from peer "
1579  << "SIZE is not found in peer response");
1581  }
1582 
1583  pos += sizeof("SIZE=") - 1;
1584  try {
1585  m_SizeToReadReq =
1588  }
1589  catch (CStringException& ex) {
1590  SRV_LOG(Critical, "Error from peer "
1592  << "Cannot parse data size: " << ex);
1594  }
1595 
1596  switch (m_CurCmd) {
1597  case eReadData:
1599  case eSyncStart:
1600  case eSyncBList:
1602  case eSyncGet:
1604  default:
1605  SRV_FATAL("Unexpected command: " << m_CurCmd);
1606  }
1607 
1608  return NULL;
1609 }
1610 
1611 CNCActiveHandler::State
1613 {
1614  if (m_Proxy->NeedEarlyClose())
1616 
1618  if (!hub)
1620 
1622  client->BeginProxyResponse(m_Response, m_SizeToReadReq);
1623  // After setting m_GotClientResponse to TRUE CNCMessageHandler can immediately
1624  // continue its job. Thus we MUST set it after we (asynchronously) messed up
1625  // with the CNCMessageHandler's socket.
1626  m_GotClientResponse = true;
1627  // Make sure that m_Proxy does proxying with the correct diag context set
1628  // (in case if it needs to log something).
1632 }
1633 
1634 CNCActiveHandler::State
1636 // similar to x_ReadDataPrefix
1637 // we just have to figure out content length
1638 {
1639  if (m_Proxy->NeedEarlyClose())
1641 
1643  if (!hub)
1645 
1647  client->BeginProxyResponse(m_Response, m_SizeToReadReq);
1648 
1649  const char* keywd = "Content-Length:";
1650  while (m_Proxy->ReadLine(&m_Response)) {
1651  client->WriteText(m_Response).WriteText("\r\n");
1652  if (m_Response.empty()) {
1653  break;
1654  }
1655  CTempString::size_type pos = m_Response.find(keywd);
1656  if (pos != CTempString::npos) {
1657  try {
1658  pos += strlen(keywd);
1659  m_SizeToReadReq =
1662  }
1663  catch (CStringException& ) {
1665  }
1666  }
1667  }
1668  m_GotClientResponse = true;
1672 }
1673 
1674 CNCActiveHandler::State
1676 {
1677  if (!m_Proxy->SocketProxyDone())
1678  return NULL;
1679 
1680  // Don't forget to remove from m_Proxy diag context set in x_ReadDataPrefix().
1682  if (!m_Proxy->ProxyHadError()) {
1686  }
1687 
1688  if (m_Proxy->NeedEarlyClose())
1689  m_ErrMsg = "ERR:Error writing to peer";
1690  else
1691  m_ErrMsg = "ERR:Error writing blob data to client";
1692 
1693  m_CmdSuccess = false;
1695 }
1696 
1697 CNCActiveHandler::State
1699 {
1702  if (!hub)
1704 
1705  m_GotClientResponse = true;
1707  client->SetRunnable();
1708 
1710 }
1711 
1712 CNCActiveHandler::State
1714 {
1715  if (NStr::FindCase(m_Response, "CROSS_SYNC") != NPOS) {
1718  }
1719  if (NStr::FindCase(m_Response, "IN_PROGRESS") != NPOS) {
1722  }
1723  if (NStr::FindCase(m_Response, "NEED_ABORT") != NPOS) {
1727  }
1728 
1730 }
1731 
1732 CNCActiveHandler::State
1734 {
1735  list<CTempString> tokens;
1736  ncbi_NStr_Split(m_Response, " ", tokens);
1737  if (tokens.size() != 2 && tokens.size() != 3)
1739 
1740  list<CTempString>::const_iterator it_tok = tokens.begin();
1741  Uint8 local_rec_no = 0, remote_rec_no = 0;
1742  try {
1743  ++it_tok;
1744  remote_rec_no = NStr::StringToUInt8(*it_tok);
1745  ++it_tok;
1746  if (it_tok != tokens.end())
1747  local_rec_no = NStr::StringToUInt8(*it_tok);
1748  }
1749  catch (CStringException&) {
1751  }
1752 
1753  bool by_blobs = m_CurCmd == eSyncBList
1754  || NStr::FindCase(m_Response, "ALL_BLOBS") != NPOS;
1755 
1756  m_SyncCtrl->StartResponse(local_rec_no, remote_rec_no, by_blobs);
1757  if (by_blobs)
1759  else
1761 }
1762 
1763 CNCActiveHandler::State
1765 {
1766  if (m_Proxy->NeedEarlyClose()) {
1768  }
1769  CTempString line;
1770  if (!m_Purge) {
1771  if (!m_Proxy->ReadLine(&line)) {
1772  return NULL;
1773  }
1774  m_Purge = (line == "PURGE:");
1775  }
1776  while (m_Proxy->ReadLine(&line)) {
1777  if (line.empty() || line == ";") {
1780  }
1781  m_Purge = false;
1782  m_SyncStartExtra.clear();
1784  }
1785  m_SyncStartExtra += line;
1786  m_SyncStartExtra += '\n';
1787  }
1788  return NULL;
1789 }
1790 
1791 CNCActiveHandler::State
1793 {
1794  if (m_SizeToRead == 0) {
1796  if (m_CurCmd == eSyncStart) {
1798  }
1800  }
1801  if (m_Proxy->NeedEarlyClose())
1803 
1804  m_KeySize = 0;
1805  if (!m_Proxy->ReadNumber(&m_KeySize))
1806  return NULL;
1807 
1808  m_SizeToRead -= sizeof(m_KeySize);
1810 }
1811 
1812 CNCActiveHandler::State
1814 {
1815  if (m_Proxy->NeedEarlyClose())
1817 
1818  SNCSyncEvent* evt;
1819  Uint2 rec_size = m_KeySize + 1
1820  + sizeof(evt->rec_no)
1821  + sizeof(evt->local_time)
1822  + sizeof(evt->orig_rec_no)
1823  + sizeof(evt->orig_server)
1824  + sizeof(evt->orig_time);
1825  m_ReadBuf.resize_mem(rec_size);
1826  if (!m_Proxy->ReadData(m_ReadBuf.data(), rec_size))
1827  return NULL;
1828 
1829  const char* data = m_ReadBuf.data();
1830  evt = new SNCSyncEvent;
1831  evt->key = CTempString(data, m_KeySize);
1832  data += m_KeySize;
1833  evt->event_type = ENCSyncEvent(*data);
1834  ++data;
1835  memcpy(&evt->rec_no, data, sizeof(evt->rec_no));
1836  data += sizeof(evt->rec_no);
1837  memcpy(&evt->local_time, data, sizeof(evt->local_time));
1838  data += sizeof(evt->local_time);
1839  memcpy(&evt->orig_rec_no, data, sizeof(evt->orig_rec_no));
1840  data += sizeof(evt->orig_rec_no);
1841  memcpy(&evt->orig_server, data, sizeof(evt->orig_server));
1842  data += sizeof(evt->orig_server);
1843  memcpy(&evt->orig_time, data, sizeof(evt->orig_time));
1844 
1845  m_SizeToRead -= rec_size;
1846  if (!m_SyncCtrl->AddStartEvent(evt)) {
1847  delete evt;
1850  }
1851 
1853 }
1854 
1855 CNCActiveHandler::State
1857 {
1858  if (m_SizeToRead == 0) {
1860  if (m_CurCmd == eSyncStart) {
1862  }
1864  }
1865  if (m_Proxy->NeedEarlyClose())
1867 
1868  m_KeySize = 0;
1869  if (!m_Proxy->ReadNumber(&m_KeySize))
1870  return NULL;
1871 
1872  m_SizeToRead -= sizeof(m_KeySize);
1874 }
1875 
1876 CNCActiveHandler::State
1878 {
1879  if (m_Proxy->NeedEarlyClose())
1881 
1882  SNCBlobSummary* blob_sum;
1883  Uint2 rec_size = m_KeySize
1884  + sizeof(blob_sum->create_time)
1885  + sizeof(blob_sum->create_server)
1886  + sizeof(blob_sum->create_id)
1887  + sizeof(blob_sum->dead_time)
1888  + sizeof(blob_sum->expire)
1889  + sizeof(blob_sum->ver_expire);
1890  m_ReadBuf.resize_mem(rec_size);
1891  if (!m_Proxy->ReadData(m_ReadBuf.data(), rec_size))
1892  return NULL;
1893 
1894  const char* data = m_ReadBuf.data();
1895  blob_sum = new SNCBlobSummary();
1896  string key(data, m_KeySize);
1897  data += m_KeySize;
1898  memcpy(&blob_sum->create_time, data, sizeof(blob_sum->create_time));
1899  data += sizeof(blob_sum->create_time);
1900  memcpy(&blob_sum->create_server, data, sizeof(blob_sum->create_server));
1901  data += sizeof(blob_sum->create_server);
1902  memcpy(&blob_sum->create_id, data, sizeof(blob_sum->create_id));
1903  data += sizeof(blob_sum->create_id);
1904  memcpy(&blob_sum->dead_time, data, sizeof(blob_sum->dead_time));
1905  data += sizeof(blob_sum->dead_time);
1906  memcpy(&blob_sum->expire, data, sizeof(blob_sum->expire));
1907  data += sizeof(blob_sum->expire);
1908  memcpy(&blob_sum->ver_expire, data, sizeof(blob_sum->ver_expire));
1909 
1910  m_SizeToRead -= rec_size;
1911  if (!m_SyncCtrl->AddStartBlob(key, blob_sum)) {
1914  }
1915 
1917 }
1918 
1919 CNCActiveHandler::State
1921 {
1922  if (m_BlobAccess->HasError()) {
1923  m_CmdSuccess = false;
1925  }
1926  if (m_BlobAccess->IsBlobExists()
1928  {
1931  }
1932  if (m_Proxy->NeedEarlyClose())
1934 
1935  m_CmdToSend.resize(0);
1936  m_CmdToSend += "SYNC_GET ";
1938  m_CmdToSend.append(1, ' ');
1940  m_CmdToSend += " \"";
1942  m_CmdToSend += "\" \"";
1944  m_CmdToSend += "\" \"";
1946  m_CmdToSend += "\" ";
1948  m_CmdToSend.append(1, ' ');
1950  m_CmdToSend.append(1, ' ');
1952  m_CmdToSend.append(1, ' ');
1955 }
1956 
1957 CNCActiveHandler::State
1959 {
1960  if (!m_BlobExists) {
1963  }
1964 
1965  if (NStr::FindCase(m_Response, "NEED_ABORT") != NPOS) {
1968  }
1969  if (NStr::FindCase(m_Response, "HAVE_NEWER") != NPOS) {
1972  }
1973 
1975 }
1976 
1977 CNCActiveHandler::State
1979 {
1980  list<CTempString> tokens;
1981  ncbi_NStr_Split(m_Response, " ", tokens);
1982  if (tokens.size() != 11) {
1984  }
1985  list<CTempString>::const_iterator it_tok = tokens.begin();
1986  try {
1987  ++it_tok;
1989  ++it_tok;
1990  m_BlobAccess->SetPassword(it_tok->substr(1, it_tok->size() - 2));
1991  ++it_tok;
1992  Uint8 create_time = NStr::StringToUInt8(*it_tok);
1993  m_BlobAccess->SetBlobCreateTime(create_time);
1994  ++it_tok;
1995  m_BlobAccess->SetBlobTTL(int(NStr::StringToUInt(*it_tok)));
1996  ++it_tok;
1997  int dead_time = NStr::StringToInt(*it_tok);
1998  ++it_tok;
1999  int expire = NStr::StringToInt(*it_tok);
2000  m_BlobAccess->SetNewBlobExpire(expire, dead_time);
2001  ++it_tok;
2003  ++it_tok;
2005  ++it_tok;
2006  Uint8 create_server = NStr::StringToUInt8(*it_tok);
2007  ++it_tok;
2008  Uint4 create_id = NStr::StringToUInt(*it_tok);
2009  m_BlobAccess->SetCreateServer(create_server, create_id);
2010  }
2011  catch (CStringException&) {
2013  }
2014 
2016 }
2017 
2018 CNCActiveHandler::State
2020 {
2021  if (m_Proxy->NeedEarlyClose())
2023 
2024  while (m_SizeToRead != 0) {
2025  Uint4 read_len = Uint4(m_BlobAccess->GetWriteMemSize());
2026  if (m_BlobAccess->HasError()) {
2027  m_ErrMsg = "ERR:Error writing blob to database";
2029  }
2030 
2031  Uint4 n_read = Uint4(m_Proxy->Read(m_BlobAccess->GetWriteMemPtr(), read_len));
2032  if (n_read != 0)
2033  CNCStat::PeerDataWrite(n_read);
2034  if (m_Proxy->NeedEarlyClose())
2036  if (n_read == 0)
2037  return NULL;
2038 
2039  m_BlobAccess->MoveWritePos(n_read);
2040  m_SizeToRead -= n_read;
2041  }
2042 
2044  if (m_BlobAccess->HasError()) {
2045  m_CmdSuccess = false;
2047  }
2048 
2049  if (m_OrigRecNo != 0) {
2050  SNCSyncEvent* event = new SNCSyncEvent();
2051  event->blob_size = m_BlobAccess->GetCurBlobSize();
2052  event->event_type = eSyncWrite;
2053  event->key = m_BlobKey;
2054  event->orig_server = m_OrigServer;
2055  event->orig_time = m_OrigTime;
2056  event->orig_rec_no = m_OrigRecNo;
2058  }
2059 
2062 }
2063 
2064 CNCActiveHandler::State
2066 {
2067  if (!m_BlobExists) {
2070  }
2071 
2072  if (NStr::FindCase(m_Response, "NEED_ABORT") != NPOS) {
2075  }
2076  if (NStr::FindCase(m_Response, "HAVE_NEWER") != NPOS) {
2079  }
2080 
2081  list<CTempString> tokens;
2082  ncbi_NStr_Split(m_Response, " ", tokens);
2083  if (tokens.size() != 7)
2085 
2086  list<CTempString>::const_iterator it_tok = tokens.begin();
2087  try {
2088  ++it_tok;
2090  ++it_tok;
2092  ++it_tok;
2094  ++it_tok;
2096  ++it_tok;
2097  m_BlobSum.expire = NStr::StringToInt(*it_tok);
2098  ++it_tok;
2100  }
2101  catch (CStringException&) {
2103  }
2104 
2105  if (m_Proxy->NeedEarlyClose())
2107 
2108  // x_DoProlongOur will change our state to continue processing.
2109  x_DoProlongOur();
2110  return NULL;
2111 }
2112 
2113 CNCActiveHandler::State
2115 {
2116  Uint8 ver = 0;
2117  try {
2118  list<CTempString> tokens;
2119  ncbi_NStr_Split(m_Response, "&", tokens);
2120  ITERATE( list<CTempString>, t, tokens) {
2121  if (NStr::StartsWith(*t, "protocol_version")) {
2122  CTempString one, two;
2123  if (NStr::SplitInTwo(*t, "=", one, two)) {
2124  list<CTempString> v;
2125  ncbi_NStr_Split(two, ".", v);
2126  if (v.size() >= 3) {
2127  for (int i=0; i<3; ++i) {
2128  ver = ver * 100 + NStr::StringToUInt8( v.front());
2129  v.pop_front();
2130  }
2131  }
2132  }
2133  break;
2134  }
2135  }
2136  } catch (...) {
2137  }
2138  m_Peer->SetHostProtocol(ver);
2140 }
2141 
2142 void
2144 {
2149 }
2150 
2151 CNCActiveHandler::State
2153 {
2154  if (m_BlobAccess->HasError()) {
2155  m_CmdSuccess = false;
2157  }
2158  if (m_Proxy->NeedEarlyClose())
2160  if (!m_BlobAccess->IsBlobExists()) {
2161  m_BlobAccess->Release();
2162  m_BlobAccess = NULL;
2164  return NULL;
2165  }
2166 
2167  bool need_event = false;
2171  {
2174  need_event = true;
2175  }
2178  need_event = true;
2179  }
2180  }
2181 
2182  if (need_event && m_OrigRecNo != 0) {
2183  SNCSyncEvent* event = new SNCSyncEvent();
2184  event->blob_size = m_BlobAccess->GetCurBlobSize();
2185  event->event_type = eSyncProlong;
2186  event->key = m_BlobKey;
2187  event->orig_server = m_OrigServer;
2188  event->orig_time = m_OrigTime;
2189  event->orig_rec_no = m_OrigRecNo;
2191  }
2192 
2195 }
2196 
2197 CNCActiveHandler::State
2199 {
2200  if (m_CmdFromClient && !m_Client)
2202  if (!m_Proxy->FlushIsDone()) {
2203  return NULL;
2204  }
2205 
2206  bool has_line = m_Proxy->ReadLine(&m_Response);
2207  if (!has_line && m_Proxy->NeedEarlyClose()) {
2208  if (m_GotCmdAnswer)
2211  }
2212  if (!has_line) {
2213  return NULL;
2214  }
2215 
2216  if (!m_GotAnyAnswer) {
2217  m_GotAnyAnswer = true;
2219  }
2220 
2221  m_GotCmdAnswer = true;
2222  m_BlobExists = true;
2223  if (NStr::StartsWith(m_Response, "ERR:")) {
2224  m_BlobExists = false;
2226  }
2227  else if (NStr::StartsWith(m_Response, "HTTP") && m_CurCmd == eReadData) {
2229  }
2230  else if (!NStr::StartsWith(m_Response, "OK:")) {
2232  }
2233 
2234  switch (m_CurCmd) {
2235  case eSearchMeta:
2237  case eCopyPut:
2239  case eCopyProlong:
2241  case eReadData:
2242  if (m_BlobExists)
2244  // fall through
2245  case eNeedOnlyConfirm:
2247  case eWriteData:
2249  case eSyncStart:
2250  case eSyncBList:
2252  case eSyncGet:
2254  case eSyncProInfo:
2256  case ePeerVersion:
2258  default:
2259  SRV_FATAL("Unexpected command: " << m_CurCmd);
2260  }
2261 
2262  return NULL;
2263 }
2264 
2265 CNCActiveHandler::State
2267 {
2268  if (!m_BlobAccess->IsMetaInfoReady())
2269  return NULL;
2270  if (m_Proxy->NeedEarlyClose())
2272 
2273  switch (m_CurCmd) {
2274  case eCopyPut:
2276  case eSyncGet:
2278  case eSyncProlongPeer:
2280  case eSyncProInfo:
2282  default:
2283  SRV_FATAL("Unexpected command: " << m_CurCmd);
2284  }
2285 
2286  return NULL;
2287 }
2288 
2289 CNCActiveHandler::State
2291 {
2292  for(;;) {
2293  if (m_ChunkSize == 0) {
2296  Uint8(0xFFFFFFFE)));
2297  if (m_ChunkSize == 0)
2299 
2301  }
2302 
2303  Uint4 want_read = m_BlobAccess->GetReadMemSize();
2304  if (m_BlobAccess->HasError()) {
2305  m_ErrMsg = "ERR:Blob data is corrupted";
2307  }
2308  if (m_ChunkSize < want_read)
2309  want_read = m_ChunkSize;
2310 
2311  Uint4 n_written = Uint4(m_Proxy->Write(m_BlobAccess->GetReadMemPtr(), want_read));
2312  if (n_written != 0)
2313  CNCStat::PeerDataRead(n_written);
2316  if (n_written == 0)
2317  return NULL;
2318 
2319  m_ChunkSize -= n_written;
2320  m_BlobAccess->MoveReadPos(n_written);
2321  }
2322 }
2323 
2324 CNCActiveHandler::State
2326 {
2327  if (m_Client)
2328  return NULL;
2330 }
2331 
2332 CNCActiveHandler::State
2334 {
2335  if (m_Proxy->NeedEarlyClose())
2337 
2338  m_CmdFromClient = false;
2339  m_GotCmdAnswer = false;
2340  m_GotClientResponse = false;
2342  m_Peer->PutConnToPool(this);
2343  return NULL;
2344 }
2345 
2346 CNCActiveHandler::State
2348 {
2349  return NULL;
2350 }
2351 
2352 void
2354 {
2356  m_Proxy->SetRunnable();
2357  m_Proxy = NULL;
2358  Terminate();
2359 }
2360 
2361 void
2363 {
2365  if (!proxy) {
2366  return;
2367  }
2368 
2371 #ifdef _DEBUG
2372 CNCAlerts::Register(CNCAlerts::eDebugSyncAborted2, "CheckCommandTimeout");
2373 #endif
2374  SRV_LOG(Warning, "Command aborted by timeout");
2375  proxy->m_NeedToClose = true;
2376  proxy->SetRunnable();
2377  }
2378 
2379  if (!m_CmdStarted)
2380  return;
2381 
2382  int delay_time = CSrvTime::CurSecs() - proxy->m_LastActive;
2383  if (delay_time > CNCDistributionConf::GetPeerTimeout()
2384  && ((m_CurCmd != eSyncBList && m_CurCmd != eSyncStart)
2385  || delay_time > CNCDistributionConf::GetBlobListTimeout()))
2386  {
2387  proxy->m_NeedToClose = true;
2388  proxy->SetRunnable();
2389  }
2390 }
2391 
static string s_PeerAuthString
ENCClientHubStatus
@ eNCHubSuccess
@ eNCHubCmdInProgress
@ eNCHubError
Uint4 GetDefaultTaskPriority(void)
Definition: scheduler.cpp:587
#define false
Definition: bool.h:36
CNCActiveHandler * m_Handler
CNCMessageHandler * m_Client
CNCMessageHandler * GetClient(void)
CNCActiveClientHub(CNCMessageHandler *client)
virtual void ExecuteRCU(void)
Method implementing RCU job that was scheduled earlier by CallRCU().
void SetStatus(ENCClientHubStatus status)
static CNCActiveClientHub * Create(Uint8 srv_id, CNCMessageHandler *client)
ENCClientHubStatus m_Status
void SetErrMsg(const string &msg)
string GetFullPeerName(void)
CNCActiveHandler_Proxy(CNCActiveHandler *handler)
virtual void ExecuteSlice(TSrvThreadNum thr_num)
This is the main method to do all work this task should do.
virtual ~CNCActiveHandler_Proxy(void)
CNCActiveHandler * m_Handler
void SetHandler(CNCActiveHandler *handler)
State x_ProcessProtocolError(void)
State x_ReadSyncStartAnswer(void)
void x_CleanCmdResources(void)
State x_ReadCopyProlong(void)
bool IsReservedForBG(void)
State x_ReadBlobsListBody(void)
void x_StartWritingBlob(void)
void ProxyHasBlob(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, Uint1 quorum)
State x_SendCopyPutCmd(void)
void ProxySetValid(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, int version)
CNCBlobAccessor * m_BlobAccess
State x_ReplaceServerConn(void)
void SyncProlongPeer(CNCActiveSyncControl *ctrl, SNCSyncEvent *event)
CTempString m_Response
void SyncRead(CNCActiveSyncControl *ctrl, SNCSyncEvent *event)
State x_ReadSyncProInfoAnswer(void)
State x_ProcessPeerError(void)
State x_ReadBlobData(void)
void SyncSend(CNCActiveSyncControl *ctrl, SNCSyncEvent *event)
State x_SendSyncGetCmd(void)
void CopyPurge(CRequestContext *cmd_ctx, const CNCBlobKeyLight &key, Uint8 when)
void ProxyWrite(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, int version, Uint4 ttl, Uint1 quorum, TNCUserFlags flags)
bool x_StartProcessing(void)
State x_ReadSyncStartExtra(void)
State x_FakeWritingBlob(void)
CNCActiveClientHub * m_Client
CNCActiveHandler_Proxy * m_Proxy
State x_ReadEventsListBody(void)
State x_SendCmdToExecute(void)
void CopyUpdate(const CNCBlobKeyLight &key, Uint8 create_time)
void ProxyBList(CRequestContext *cmd_ctx, const CNCBlobKey &key, bool force_local, SNCBlobFilter *filters)
void ProxyRead(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, int version, Uint8 start_pos, Uint8 size, Uint1 quorum, bool search, bool force_local, Uint8 age)
State x_ReadConfirm(void)
State x_ReadBlobsListKeySize(void)
State x_FinishWritingBlob(void)
void SetReservedForBG(bool value)
State x_ReadCopyPut(void)
void SetClientHub(CNCActiveClientHub *hub)
void x_SendCopyProlongCmd(const SNCBlobSummary &blob_sum)
CNCActiveHandler(Uint8 srv_id, CNCPeerControl *peer)
void SyncCancel(CNCActiveSyncControl *ctrl)
void CopyRemove(const CNCBlobKeyLight &key, Uint8 create_time)
SNCBlobSummary m_BlobSum
State x_ReadSyncGetAnswer(void)
State x_FinishCommand(void)
State x_WaitForMetaInfo(void)
void SyncStart(CNCActiveSyncControl *ctrl, Uint8 local_rec_no, Uint8 remote_rec_no)
void ProxyRemove(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, int version, Uint1 quorum)
State x_PrepareSyncProlongCmd(void)
State x_ReadEventsListKeySize(void)
ESynActionType m_SyncAction
TNCBufferType m_ReadBuf
State x_ReadHttpDataPrefix(void)
State x_WriteBlobData(void)
void SyncCommit(CNCActiveSyncControl *ctrl, Uint8 local_rec_no, Uint8 remote_rec_no)
State x_ReadFoundMeta(void)
State x_IdleState(void)
CNCActiveSyncControl * m_SyncCtrl
void ClientReleased(void)
State x_ConnClosedReplaceable(void)
void AskPeerVersion(void)
void ProxyGetMeta(CRequestContext *cmd_ctx, const CNCBlobKey &key, Uint1 quorum, bool force_local, int http)
static void Initialize(void)
void ProxyProlong(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, unsigned int add_time, Uint1 quorum, bool search, bool force_local)
void SetProxy(CNCActiveHandler_Proxy *proxy)
CNCPeerControl * m_Peer
void x_SetStateAndStartProcessing(State state)
State x_ExecuteProInfoCmd(void)
void x_FinishSyncCmd(ESyncResult result, int hint)
void ProxyReadLast(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, Uint8 start_pos, Uint8 size, Uint1 quorum, bool search, bool force_local, Uint8 age)
State x_InvalidState(void)
void SyncBlobsList(CNCActiveSyncControl *ctrl)
void CloseForShutdown(void)
State x_CloseCmdAndConn(void)
State x_FinishBlobFromClient(void)
void SyncProlongOur(CNCActiveSyncControl *ctrl, SNCSyncEvent *event)
void ProxyGetSize(CRequestContext *cmd_ctx, const CNCBlobKey &key, const string &password, int version, Uint1 quorum, bool search, bool force_local)
State x_ReadDataPrefix(void)
State x_WaitOneLineAnswer(void)
void ResetSizeRdWr(void)
void CheckCommandTimeout(void)
State x_ReadWritePrefix(void)
State x_CloseConn(void)
CNCBlobKeyLight m_BlobKey
State x_ReadPeerVersion(void)
void CopyProlong(const CNCBlobKeyLight &key, Uint2 slot, Uint8 orig_rec_no, Uint8 orig_time, const SNCBlobSummary &blob_sum)
Uint8 GetSrvId(void) const
State x_WaitClientRelease(void)
void CopyPut(CRequestContext *cmd_ctx, const CNCBlobKeyLight &key, Uint2 slot, Uint8 orig_rec_no)
State x_ReadSyncGetHeader(void)
void x_DoProlongOur(void)
State x_MayDeleteThis(void)
State x_ReadSizeToRead(void)
virtual ~CNCActiveHandler(void)
void x_SetSlotAndBucketAndVerifySlot(Uint2 slot)
void SearchMeta(CRequestContext *cmd_ctx, const CNCBlobKey &key)
State x_ReadDataForClient(void)
State x_ReadSyncStartHeader(void)
State x_PutSelfToPool(void)
bool AddStartEvent(SNCSyncEvent *evt)
bool AddStartBlob(const string &key, SNCBlobSummary *blob_sum)
void CmdFinished(ESyncResult res, ESynActionType action, CNCActiveHandler *conn, int hint)
void StartResponse(Uint8 local_rec_no, Uint8 remote_rec_no, bool by_blobs)
static void Register(EAlertType alert_type, const string &message)
Definition: nc_utils.cpp:188
@ eDebugSyncAborted2
Definition: nc_utils.hpp:247
void * GetWriteMemPtr(void)
Uint8 GetCurBlobSize(void) const
Get size of the blob.
void RequestMetaInfo(CSrvTask *owner)
void SetNewVerExpire(int dead_time)
void MoveReadPos(Uint4 move_size)
bool IsMetaInfoReady(void)
size_t GetWriteMemSize(void)
void SetCurBlobExpire(int expire, int dead_time=0)
unsigned int GetCurBlobTTL(void) const
void SetCurVerExpire(int dead_time)
unsigned int GetCurVersionTTL(void) const
void SetPassword(CTempString password)
bool IsCurBlobDead(void) const
void SetNewBlobExpire(int expire, int dead_time=0)
int GetCurBlobDeadTime(void) const
void SetVersionTTL(int ttl)
static bool UpdatePurgeData(const string &data, char separator='\n')
Uint4 GetCurCreateId(void) const
void SetBlobTTL(unsigned int ttl)
Set blob's timeout after last access before it will be deleted.
Uint8 GetCurBlobCreateTime(void) const
Uint8 GetCurCreateServer(void) const
void SetCreateServer(Uint8 create_server, Uint4 create_id)
void MoveWritePos(Uint4 move_size)
int GetCurBlobVersion(void) const
Uint4 GetReadMemSize(void)
Uint8 GetPosition(void)
int GetCurBlobExpire(void) const
string GetCurPassword(void) const
void SetPosition(Uint8 pos)
Initially set current position in the blob to start reading from.
int GetCurVerExpire(void) const
bool HasError(void) const
void SetBlobVersion(int ver)
void SetBlobCreateTime(Uint8 create_time)
const void * GetReadMemPtr(void)
bool IsBlobExists(void) const
Check if blob exists.
void Release(void)
Release blob lock.
const string & PackedKey(void) const
Definition: netcached.hpp:180
const CTempString & RawKey(void) const
Definition: netcached.hpp:184
const CTempString & SubKey(void) const
Definition: netcached.hpp:188
const CTempString & Cache(void) const
Definition: netcached.hpp:192
static void SavePurgeData(void)
static CNCBlobAccessor * GetBlobAccess(ENCAccessType access, const string &key, const string &password, Uint2 time_bucket)
Acquire access to the blob identified by key, subkey and version.
static Uint4 GetSyncPriority(void)
static string GetFullPeerName(Uint8 srv_id)
static Uint8 GetNetworkErrorTimeout(void)
static Uint1 GetBlobListTimeout(void)
static Uint8 GetSelfID(void)
static Uint1 GetPeerTimeout(void)
Handler of all NetCache incoming requests.
bool IsBlobWritingFinished(void)
void AbortInitialSync(void)
void RegisterConnError(void)
void RegisterConnSuccess(void)
void PutConnToPool(CNCActiveHandler *conn)
CNCActiveHandler * GetPooledConn(void)
bool AcceptsUserFlags(void) const
void ReleaseConn(CNCActiveHandler *conn)
bool CreateNewSocket(CNCActiveHandler *conn)
void SetHostProtocol(Uint8 ver)
void AssignClientConn(CNCActiveClientHub *hub)
static CNCPeerControl * Peer(Uint8 srv_id)
static void ClientDataRead(size_t data_size)
Definition: nc_stat.cpp:501
static void PeerDataWrite(size_t data_size)
Definition: nc_stat.cpp:541
static void PeerDataRead(size_t data_size)
Definition: nc_stat.cpp:547
static Uint8 AddEvent(Uint2 slot, SNCSyncEvent *event)
Definition: sync_log.cpp:559
const value_type * data() const
void resize_mem(size_type new_size)
Resize the buffer. No data preservation.
void CallRCU(void)
Method to be called to schedule call of ExecuteRCU() at appropriate time.
Definition: rcu.cpp:194
Special variant of CRef that doesn't check for NULL when dereferencing.
Definition: srv_ref.hpp:41
bool IsProxyInProgress(void)
Check whether proxying started earlier is still in progress.
bool NeedToClose(void)
Checks if socket should be closed because of long inactivity or because server is in "hard" shutdown ...
size_t Write(const void *buf, size_t size)
Write into the socket as much as immediately possible (including writing into internal write buffers ...
void AbortSocket(void)
Abort the socket, i.e.
CSrvSocketTask & WriteText(CTempString message)
Write text into socket.
bool ProxyHadError(void)
Check whether proxying started earlier finished successfully or any of sockets had some error in it.
virtual void Terminate(void)
Terminate the task.
bool m_NeedToClose
Flag showing that socket needs to be closed because of long inactivity.
bool ReadData(void *buf, Uint2 size)
Read from socket exactly the given data size.
void RequestFlush(void)
Request flushing of all data saved in internal write buffers to socket.
size_t Read(void *buf, size_t size)
Read from socket into memory.
bool ReadNumber(NumType *num)
Read from socket a number in native machine representation.
void WriteData(const void *buf, size_t size)
Write the exact amount of data into the socket.
bool StartProcessing(TSrvThreadNum thread_num=0, bool boost=false)
Start processing of the socket and include it into TaskServer's central epoll.
bool ReadLine(CTempString *line)
Read from socket one line which ends with ' ', '\r ' or '\0'.
bool NeedEarlyClose(void)
Checks if socket should be closed because of internal reasons (long inactivity or "hard" shutdown as ...
void StartProxyTo(CSrvSocketTask *dst_task, Uint8 proxy_size)
Start proxying of raw data from this socket to the one in dst_task.
bool FlushIsDone(void)
Check if data flushing requested earlier is complete.
void SetState(State state)
Sets current state of the machine.
Definition: srv_tasks.hpp:310
void SetDiagCtx(CRequestContext *ctx)
Set diagnostic context for this task to work in.
Definition: logging.cpp:1197
int m_LastActive
Time (in seconds) when the task was active last time, i.e.
Definition: srv_tasks.hpp:212
CRequestContext * GetDiagCtx(void)
Get current diagnostic context for the task.
void SetPriority(Uint4 prty)
Set and retrieve task's priority.
void SetRunnable(bool boost=false)
Set this task "runnable", i.e.
Definition: scheduler.cpp:618
virtual void Terminate(void)
Stops task's execution and deletes it.
Definition: scheduler.cpp:612
void ReleaseDiagCtx(void)
Releases current diagnostic context of the task.
Definition: logging.cpp:1257
static int CurSecs(void)
Current time in seconds since epoch (time_t).
CStringException –.
Definition: ncbistr.hpp:4505
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:65
static void StartSyncBlob(Uint8 create_time)
void(*)(CSeq_entry_Handle seh, IWorkbench *wb, const CSerialObject &obj) handler
char value[7]
Definition: config.c:431
static uch flags
CS_CONTEXT * ctx
Definition: t0006.c:12
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
#define NULL
Definition: ncbistd.hpp:225
string GetSessionID(void) const
Session ID.
string GetClientIP(void) const
Client IP/hostname.
void SetRequestStatus(int status)
void Critical(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1203
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
uint8_t Uint1
1-byte (8-bit) unsigned integer
Definition: ncbitype.h:99
uint32_t Uint4
4-byte (32-bit) unsigned integer
Definition: ncbitype.h:103
uint16_t Uint2
2-byte (16-bit) unsigned integer
Definition: ncbitype.h:101
int64_t Int8
8-byte (64-bit) signed integer
Definition: ncbitype.h:104
uint64_t Uint8
8-byte (64-bit) unsigned integer
Definition: ncbitype.h:105
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
NCBI_NS_STD::string::size_type SIZE_TYPE
Definition: ncbistr.hpp:132
static string Int8ToString(Int8 value, TNumToStringFlags flags=0, int base=10)
Convert Int8 to string.
Definition: ncbistr.hpp:5158
#define kEmptyStr
Definition: ncbistr.hpp:123
static int StringToInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to int.
Definition: ncbistr.cpp:630
#define NPOS
Definition: ncbistr.hpp:133
static string IntToString(int value, TNumToStringFlags flags=0, int base=10)
Convert int to string.
Definition: ncbistr.hpp:5083
bool empty(void) const
Return true if the represented string is empty (i.e., the length is zero)
Definition: tempstr.hpp:334
static SIZE_TYPE FindCase(const CTempString str, const CTempString pattern, SIZE_TYPE start, SIZE_TYPE end, EOccurrence which=eFirst)
Find the pattern in the specified range of a string using a case sensitive search.
Definition: ncbistr.hpp:5489
static string UIntToString(unsigned int value, TNumToStringFlags flags=0, int base=10)
Convert UInt to string.
Definition: ncbistr.hpp:5108
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
Definition: ncbistr.hpp:5411
void clear(void)
Clears the string.
Definition: tempstr.hpp:351
size_t size_type
Definition: tempstr.hpp:70
static Uint8 StringToUInt8(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to Uint8.
Definition: ncbistr.cpp:873
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
Definition: ncbistr.cpp:3550
static unsigned int StringToUInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to unsigned int.
Definition: ncbistr.cpp:642
CTempString substr(size_type pos) const
Obtain a substring from this string, beginning at a given offset.
Definition: tempstr.hpp:776
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
Definition: ncbistr.hpp:673
size_type find(const CTempString match, size_type pos=0) const
Find the first instance of the entire matching string within the current string, beginning at an opti...
Definition: tempstr.hpp:655
size_type size(void) const
Return the length of the represented array.
Definition: tempstr.hpp:327
static string UInt8ToString(Uint8 value, TNumToStringFlags flags=0, int base=10)
Convert UInt8 to string.
Definition: ncbistr.hpp:5167
static const size_type npos
Definition: tempstr.hpp:72
@ fAllowTrailingSymbols
Ignore trailing non-numerics characters.
Definition: ncbistr.hpp:298
@ fAllowLeadingSpaces
Ignore leading spaces in converted string.
Definition: ncbistr.hpp:294
int i
static int version
Definition: mdb_load.c:29
Uint4 TNCUserFlags
const struct ncbi::grid::netcache::search::fields::SIZE size
const struct ncbi::grid::netcache::search::fields::KEY key
const string & GetMessageByStatus(EHTTPStatus sts)
Definition: nc_utils.cpp:97
static const char *const kNCPeerClientName
Definition: nc_utils.hpp:56
@ eStatus_OK
Command is ok and execution is good.
Definition: nc_utils.hpp:115
@ eStatus_BadPeer
Peer returned something wrong.
Definition: nc_utils.hpp:155
@ eNCReadData
Read blob data.
Definition: nc_utils.hpp:88
@ eNCCopyCreate
(Re-)write blob from another NetCache (as opposed to writing from client)
Definition: nc_utils.hpp:90
@ eNCRead
Read meta information only.
Definition: nc_utils.hpp:87
EIPRangeType t
Definition: ncbi_localip.c:101
T min(T x_, T y_)
@ eSynActionNone
@ eSynActionProlong
@ eSynActionRead
@ eSynActionWrite
#define NC_SYNC_HINT
ESyncResult
@ eSynServerBusy
@ eSynNetworkError
@ eSynOK
@ eSynCrossSynced
@ eSynAborted
Defines CRequestContext class for NCBI C++ diagnostic API.
static CNamedPipeClient * client
#define SRV_LOG(sev, msg)
Macro to be used for printing log messages.
Definition: srv_diag.hpp:162
#define SRV_FATAL(msg)
Definition: srv_diag.hpp:173
#define ACCESS_ONCE(x)
Purpose of this macro is to force compiler to access variable exactly at the place it's written (no m...
Definition: srv_sync.hpp:51
@ kUSecsPerSecond
Definition: srv_time.hpp:46
Uint8 exp_epoch_ge
Definition: nc_db_info.hpp:274
Uint8 vexp_epoch_lt
Definition: nc_db_info.hpp:279
Uint8 vexp_epoch_ge
Definition: nc_db_info.hpp:278
Uint8 exp_epoch_lt
Definition: nc_db_info.hpp:275
void reset(void)
Definition: nc_db_info.hpp:217
Single event record.
Definition: sync_log.hpp:52
Uint8 orig_time
Definition: sync_log.hpp:57
CNCBlobKeyLight key
Definition: sync_log.hpp:55
Uint8 orig_server
Definition: sync_log.hpp:59
Uint8 orig_rec_no
Definition: sync_log.hpp:61
Uint8 local_time
Definition: sync_log.hpp:63
Uint8 rec_no
Definition: sync_log.hpp:53
ENCSyncEvent event_type
Definition: sync_log.hpp:56
ENCSyncEvent
Event types to log.
Definition: sync_log.hpp:43
@ eSyncProlong
Definition: sync_log.hpp:45
@ eSyncWrite
Definition: sync_log.hpp:44
Uint2 TSrvThreadNum
Type for thread number in TaskServer.
Definition: task_server.hpp:42
else result
Definition: token2.c:20
Modified on Fri Dec 01 04:47:56 2023 by modify_doxy.py rev. 669887