NCBI C++ ToolKit
neticache_client.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: neticache_client.cpp 91247 2020-09-30 19:02:00Z sadyrovr $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Anatoliy Kuznetsov, Dmitry Kazimirov
27  *
28  * File Description:
29  * Implementation of netcache ICache client.
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 
35 #include "netcache_api_impl.hpp"
36 
41 
43 
44 #include <util/cache/icache_cf.hpp>
45 #include <util/transmissionrw.hpp>
46 
47 #include <corelib/ncbitime.hpp>
48 #include <corelib/request_ctx.hpp>
50 #include <corelib/ncbi_system.hpp>
52 
53 #include <memory>
54 #include <sstream>
55 
56 #define MAX_ICACHE_CACHE_NAME_LENGTH 64
57 #define MAX_ICACHE_KEY_LENGTH 256
58 #define MAX_ICACHE_SUBKEY_LENGTH 256
59 
60 #define NCBI_USE_ERRCODE_X ConnServ_NetCache
61 
62 
64 
65 const char* const kNetICacheDriverName = "netcache";
66 
67 static string s_CheckKeySubkey(
68  const string& key, const string& subkey, string* encoded_key)
69 {
70  encoded_key->push_back('"');
71  encoded_key->append(NStr::PrintableString(key));
72 
73  string encoded_subkey(NStr::PrintableString(subkey));
74 
75  if (encoded_key->length() > (1 + MAX_ICACHE_KEY_LENGTH) ||
76  encoded_subkey.length() > MAX_ICACHE_SUBKEY_LENGTH) {
77  NCBI_THROW(CNetCacheException, eKeyFormatError,
78  "ICache key or subkey is too long");
79  }
80 
81  return encoded_subkey;
82 }
83 
84 static string s_KeySubkeyToBlobID(const string& key, const string& subkey)
85 {
86  string blob_id(kEmptyStr);
87  blob_id.reserve(1 + key.length() + 3 + subkey.length() + 1);
88 
89  string encoded_subkey(s_CheckKeySubkey(key, subkey, &blob_id));
90 
91  blob_id.append("\" \"", 3);
92 
93  blob_id.append(encoded_subkey);
94 
95  blob_id.push_back('"');
96 
97  return blob_id;
98 }
99 
101  const string& key, int version, const string& subkey)
102 {
103  string blob_id(kEmptyStr);
104  blob_id.reserve(1 + key.length() + 2 +
105  int((double)sizeof(version) * 1.5) + 2 + subkey.length() + 1);
106 
107  string encoded_subkey(s_CheckKeySubkey(key, subkey, &blob_id));
108 
109  blob_id.append("\" ", 2);
110  blob_id.append(NStr::IntToString(version));
111  blob_id.append(" \"", 2);
112 
113  blob_id.append(encoded_subkey);
114 
115  blob_id.push_back('"');
116 
117  return blob_id;
118 }
119 
120 static const char s_NetICacheAPIName[] = "NetICacheClient";
121 
123 {
125  const string& section,
126  const string& service_name,
127  const string& client_name,
128  const string& cache_name) :
129  m_CacheFlags(ICache::fBestPerformance)
130  {
131  m_DefaultParameters.SetCacheName(cache_name);
132  SRegSynonyms sections{ section, "netcache_api", "netcache_client", kNetICacheDriverName };
133  m_Service = SNetServiceImpl::Create(s_NetICacheAPIName, service_name, client_name,
135  registry_builder, sections);
136  Init(registry_builder, sections);
137  }
138 
140  SNetCacheAPIImpl(server, parent),
141  m_CacheFlags(parent->m_CacheFlags)
142  {
143  }
144 
146  const string& key,
147  bool multiline_output,
148  const CNetCacheAPIParameters* parameters);
149 
150  string MakeStdCmd(const char* cmd_base, const string& blob_id,
151  const CNetCacheAPIParameters* parameters,
152  const string& injection = kEmptyStr);
153 
154  string ExecStdCmd(const char* cmd_base, const string& key,
155  int version, const string& subkey,
156  const CNetCacheAPIParameters* parameters);
157 
159  const CNetCacheAPIParameters* parameters);
160 
162  const string& key, const string& subkey,
163  size_t* blob_size_ptr,
164  int* version,
166  unsigned max_age, unsigned* actual_age,
167  const CNamedParameterList* optional = NULL);
168 
169  IReader* GetReadStreamPart(const string& key,
170  int version, const string& subkey,
171  size_t offset, size_t part_size,
172  size_t* blob_size_ptr,
173  const CNamedParameterList* optional);
174 
175  void Init(CSynRegistry& registry, const SRegSynonyms& sections);
176 
178 };
179 
181 {
182  SNetCacheAPIImpl::Init(registry, sections);
183 
184  auto cache_name = m_DefaultParameters.GetCacheName();
185 
186  if (cache_name.empty()) cache_name = registry.Get(sections, { "name", "cache_name" }, "default_cache");
187 
188  if (cache_name.length() > MAX_ICACHE_CACHE_NAME_LENGTH) {
189  NCBI_THROW(CNetCacheException, eAuthenticationError, "NetICache: cache name is too long");
190  }
191 
192  m_DefaultParameters.SetCacheName(cache_name);
193  m_DefaultParameters.SetTryAllServers(registry.Get(sections, "try_all_servers", false));
194 }
195 
197  CNetCacheWriter* nc_writer, const CNetCacheAPIParameters* parameters)
198 {
199  string cmd("IC(" + NStr::PrintableString(parameters->GetCacheName()));
200  cmd.append(") STOR ");
201 
202  cmd.append(NStr::UIntToString(parameters->GetTTL()));
203  cmd.push_back(' ');
204  cmd.append(nc_writer->GetBlobID());
205  if (nc_writer->GetResponseType() == eNetCache_Wait)
206  cmd.append(" confirm=1");
208  if (m_FlagsOnWrite) cmd.append(" flags=").append(to_string(m_FlagsOnWrite));
209 
210  return ChooseServerAndExec(cmd, nc_writer->GetKey(),
211  false, parameters).conn;
212 }
213 
215  const string& conf_section) :
216  m_Impl(new SNetICacheClientImpl(NULL, conf_section,
218 {
219 }
220 
222  const string& host,
223  unsigned short port,
224  const string& cache_name,
225  const string& client_name) :
226  m_Impl(new SNetICacheClientImpl(NULL, kEmptyStr,
227  host + ':' + NStr::UIntToString(port),
228  client_name, cache_name))
229 {
230 }
231 
233  const string& service_name,
234  const string& cache_name,
235  const string& client_name) :
236  m_Impl(new SNetICacheClientImpl(NULL, kEmptyStr,
237  service_name, client_name, cache_name))
238 {
239 }
240 
241 CNetICacheClient::CNetICacheClient(CConfig* config, const string& driver_name) :
242  m_Impl(new SNetICacheClientImpl(config, driver_name,
244 {
245 }
246 
247 CNetICacheClient::CNetICacheClient(const IRegistry& reg, const string& section) :
248  m_Impl(new SNetICacheClientImpl(reg, section,
250 {
251 }
252 
254 {
255  m_Impl->m_Service->m_ServerPool.SetCommunicationTimeout(to);
256 }
257 
258 
260 {
261  return m_Impl->m_Service->m_ServerPool.GetCommunicationTimeout();
262 }
263 
265 {
266 public:
268  const string& key) :
269  m_Service(service),
270  m_Key(key)
271  {
272  }
273 
274  virtual CNetServer BeginIteration();
275  virtual CNetServer NextServer();
276 
277 private:
279  const string& m_Key;
281 };
282 
284 {
286 }
287 
289 {
290  return ++m_Iterator ? *m_Iterator : CNetServer();
291 }
292 
294  const string& cmd,
295  const string& key,
296  bool multiline_output,
297  const CNetCacheAPIParameters* parameters)
298 {
299  CNetServer selected_server(parameters->GetServerToUse());
300  CNetServer* server_last_used_ptr(parameters->GetServerLastUsedPtr());
301 
302  const auto try_all_servers = parameters->GetTryAllServers();
303  CNetServer::SExecResult exec_result;
304 
305  if (!try_all_servers && selected_server) {
306  exec_result = selected_server.ExecWithRetry(cmd, multiline_output);
307 
308  } else if (selected_server) {
309  ESwitch server_check = eDefault;
310  parameters->GetServerCheck(&server_check);
311 
312  SNetCacheMirrorTraversal mirror_traversal(m_Service,
313  selected_server, server_check);
314 
315  m_Service->IterateUntilExecOK(cmd, multiline_output, exec_result,
316  &mirror_traversal, SNetServiceImpl::eIgnoreServerErrors);
317  } else {
318  SWeightedServiceTraversal service_traversal(m_Service, key);
319 
320  m_Service->IterateUntilExecOK(cmd, multiline_output, exec_result,
321  &service_traversal, try_all_servers ?
323  }
324 
325  if (server_last_used_ptr != NULL)
326  *server_last_used_ptr = exec_result.conn->m_Server;
327 
328  return exec_result;
329 }
330 
332 {
333  NCBI_THROW(CNetCacheException, eNotImplemented, "SMR is not implemented");
334 }
335 
336 
338 {
339  NCBI_THROW(CNetCacheException, eNotImplemented, "SMU is not implemented");
340 }
341 
342 
344 {
345  return m_Impl->m_CacheFlags;
346 }
347 
348 
350 {
351  m_Impl->m_CacheFlags = flags;
352 }
353 
354 
356 {
357  NCBI_THROW(CNetCacheException, eNotImplemented, "STSP is not implemented");
358 }
359 
360 
362 {
363  NCBI_THROW(CNetCacheException, eNotImplemented, "GTSP is not implemented");
364 }
365 
366 
368 {
369  NCBI_THROW(CNetCacheException, eNotImplemented, "GTOU is not implemented");
370 }
371 
372 
374 {
375  NCBI_THROW(CNetCacheException, eNotImplemented, "ISOP is not implemented");
376 }
377 
378 
380 {
381  NCBI_THROW(CNetCacheException, eNotImplemented, "SVRP is not implemented");
382 }
383 
384 
386 {
387  NCBI_THROW(CNetCacheException, eNotImplemented, "GVRP is not implemented");
388 }
389 
390 
391 void CNetICacheClient::Store(const string& key,
392  int version,
393  const string& subkey,
394  const void* data,
395  size_t size,
396  unsigned int time_to_live,
397  const string& /*owner*/)
398 {
399  string blob_id(s_KeyVersionSubkeyToBlobID(key, version, subkey));
400 
401  CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
402 
403  parameters.SetTTL(time_to_live);
405 
406  CNetCacheWriter writer(m_Impl, &blob_id, key,
407  m_Impl->m_CacheFlags & ICache::fBestReliability ?
408  eNetCache_Wait : eICache_NoWait, &parameters);
409 
410  writer.WriteBufferAndClose(reinterpret_cast<const char*>(data), size);
411 }
412 
413 
414 size_t CNetICacheClient::GetSize(const string& key,
415  int version,
416  const string& subkey)
417 {
418  return GetBlobSize(key, version, subkey);
419 }
420 
421 size_t CNetICacheClient::GetBlobSize(const string& key,
422  int version, const string& subkey,
423  const CNamedParameterList* optional)
424 {
425  CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
426 
427  parameters.LoadNamedParameters(optional);
428 
429  return CheckBlobSize(NStr::StringToUInt8(m_Impl->ExecStdCmd(
430  "GSIZ", key, version, subkey, &parameters)));
431 }
432 
433 
434 void CNetICacheClient::GetBlobOwner(const string&, int, const string&,
435  string* owner)
436 {
437  ERR_POST("NetCache command 'GBLW' has been phased out");
438  *owner = kEmptyStr;
439 }
440 
442  const string& key, int version, const string& subkey,
443  size_t offset, size_t part_size,
444  size_t* blob_size_ptr, const CNamedParameterList* optional)
445 {
446  try {
447  string blob_id(s_KeyVersionSubkeyToBlobID(key, version, subkey));
448 
450 
451  parameters.LoadNamedParameters(optional);
452 
453  const char* cmd_name;
454  string cmd;
455 
456  if (offset == 0 && part_size == 0) {
457  cmd_name = "READ";
458  cmd = MakeStdCmd(cmd_name, blob_id, &parameters);
459  } else {
460  cmd_name = "READPART";
461  cmd = MakeStdCmd(cmd_name, blob_id, &parameters,
462  ' ' + NStr::UInt8ToString((Uint8) offset) +
463  ' ' + NStr::UInt8ToString((Uint8) part_size));
464  }
465 
466  CNetServer::SExecResult exec_result(
467  ChooseServerAndExec(cmd, key, false, &parameters));
468 
469  unsigned* actual_age_ptr = parameters.GetActualBlobAgePtr();
470  if (parameters.GetMaxBlobAge() > 0 && actual_age_ptr != NULL)
471  *actual_age_ptr = x_ExtractBlobAge(exec_result, cmd_name);
472 
473  return new CNetCacheReader(this, blob_id,
474  exec_result, blob_size_ptr, &parameters);
475  }
477  return NULL;
478  }
479  catch (CNetCacheException& e) {
481  throw;
482  return NULL;
483  }
484 }
485 
486 bool CNetICacheClient::Read(const string& key,
487  int version,
488  const string& subkey,
489  void* buf,
490  size_t buf_size)
491 {
492  return ReadPart(key, version, subkey, 0, 0, buf, buf_size);
493 }
494 
495 bool CNetICacheClient::ReadPart(const string& key,
496  int version,
497  const string& subkey,
498  size_t offset,
499  size_t part_size,
500  void* buf,
501  size_t buf_size)
502 {
503  size_t blob_size;
504 
505  unique_ptr<IReader> rdr(m_Impl->GetReadStreamPart(
506  key, version, subkey, offset, part_size,
508 
509  if (rdr.get() == NULL)
510  return false;
511 
512  return SNetCacheAPIImpl::ReadBuffer(*rdr, (char*) buf, buf_size,
513  NULL, blob_size) == CNetCacheAPI::eReadComplete;
514 }
515 
516 
518  int version,
519  const string& subkey,
520  SBlobAccessDescr* blob_descr)
521 {
522  if (blob_descr->return_current_version) {
523  blob_descr->return_current_version_supported = true;
524  blob_descr->reader.reset(m_Impl->ReadCurrentBlobNotOlderThan(
525  key,
526  subkey,
527  &blob_descr->blob_size,
528  &blob_descr->current_version,
529  &blob_descr->current_version_validity,
530  blob_descr->maximum_age,
531  &blob_descr->actual_age));
532  } else if (blob_descr->maximum_age > 0) {
533  blob_descr->reader.reset(m_Impl->GetReadStreamPart(key, version, subkey,
534  0, 0, &blob_descr->blob_size,
536  nc_max_age = blob_descr->maximum_age,
537  nc_actual_age = &blob_descr->actual_age)));
538  } else {
539  blob_descr->reader.reset(m_Impl->GetReadStreamPart(key, version, subkey,
540  0, 0, &blob_descr->blob_size,
542  }
543 
544  if (blob_descr->reader.get() != NULL) {
545  blob_descr->blob_found = true;
546 
547  if (blob_descr->buf && blob_descr->buf_size >= blob_descr->blob_size) {
548  try {
550  blob_descr->buf, blob_descr->buf_size,
551  NULL, blob_descr->blob_size);
552  }
553  catch (CNetServiceException&) {
554  blob_descr->reader.reset(NULL);
555  throw;
556  }
557  blob_descr->reader.reset(NULL);
558  }
559  } else {
560  blob_descr->blob_size = 0;
561  blob_descr->blob_found = false;
562  }
563 }
564 
565 
567  int version,
568  const string& subkey,
569  unsigned int time_to_live,
570  const string& /*owner*/)
571 {
572  return GetNetCacheWriter(key, version, subkey, nc_blob_ttl = time_to_live);
573 }
574 
575 
577  int version, const string& subkey, const CNamedParameterList* optional)
578 {
579  string blob_id(s_KeyVersionSubkeyToBlobID(key, version, subkey));
580 
581  CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
582 
583  parameters.LoadNamedParameters(optional);
584 
585  return new CNetCacheWriter(m_Impl, &blob_id, key,
586  m_Impl->m_CacheFlags & ICache::fBestReliability ?
587  eNetCache_Wait : eICache_NoWait, &parameters);
588 }
589 
590 
591 void CNetICacheClient::Remove(const string& key,
592  int version,
593  const string& subkey)
594 {
596 }
597 
599  int version, const string& subkey,
600  const CNamedParameterList* optional)
601 {
602  CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
603 
604  parameters.LoadNamedParameters(optional);
605 
606  m_Impl->ExecStdCmd("REMO", key, version, subkey, &parameters);
607 }
608 
609 time_t CNetICacheClient::GetAccessTime(const string&, int, const string&)
610 {
611  NCBI_THROW(CNetCacheException, eNotImplemented, "GACT is not implemented");
612 }
613 
614 
615 bool CNetICacheClient::HasBlobs(const string& key,
616  const string& subkey)
617 {
618  return HasBlob(key, subkey);
619 }
620 
621 bool CNetICacheClient::HasBlob(const string& key, const string& subkey,
622  const CNamedParameterList* optional)
623 {
624  CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
625 
626  parameters.LoadNamedParameters(optional);
627 
628  string response(m_Impl->ExecStdCmd("HASB", key, 0, subkey, &parameters));
629  return (response[0] == '1'|| NStr::StartsWith(response, "0, VER="));
630 }
631 
632 void CNetICacheClient::Purge(time_t access_timeout)
633 {
634  Purge(kEmptyStr, kEmptyStr, access_timeout);
635 }
636 
637 
638 void CNetICacheClient::Purge(const string& key,
639  const string& subkey,
640  time_t access_timeout)
641 {
642  if (access_timeout) {
643  NCBI_THROW(CNetCacheException, eNotImplemented, "Not implemented");
644  }
645 
646  if (!subkey.empty()) {
647  if (key.empty()) {
648  NCBI_THROW(CNetCacheException, eNotImplemented, "Not implemented");
649  }
650 
651  return RemoveBlob(key, 0, subkey);
652  }
653 
654  const auto cmd = m_Impl->MakeStdCmd("PURGE2", "'" + key + "'", &m_Impl->m_DefaultParameters);
655  m_Impl->ChooseServerAndExec(cmd, key, false, &m_Impl->m_DefaultParameters);
656 }
657 
659  const string& key,
660  int version,
661  const string& subkey,
662  size_t* blob_size_ptr,
663  const CNamedParameterList* optional)
664 {
666  0, 0, blob_size_ptr, optional);
667 }
668 
670  const string& key,
671  int version,
672  const string& subkey,
673  size_t offset,
674  size_t part_size,
675  size_t* blob_size_ptr,
676  const CNamedParameterList* optional)
677 {
678  return m_Impl->GetReadStreamPart(key, version, subkey,
679  offset, part_size, blob_size_ptr, optional);
680 }
681 
683  int version,
684  const string& subkey)
685 {
686  return GetReadStream(key, version, subkey, NULL,
688 }
689 
691  const string& subkey, int* version,
692  size_t* blob_size_ptr, const CNamedParameterList* optional)
693 {
695  return m_Impl->ReadCurrentBlobNotOlderThan(key, subkey, blob_size_ptr,
696  version, &not_used, 0, NULL, optional);
697 }
698 
700  const string& subkey, int* version,
702 {
703  return m_Impl->ReadCurrentBlobNotOlderThan(key, subkey, NULL,
704  version, validity, 0, NULL);
705 }
706 
708  const string& subkey,
709  size_t* blob_size_ptr,
710  int* version,
712  unsigned max_age, unsigned* actual_age,
713  const CNamedParameterList* optional)
714 {
715  try {
716  string blob_id(s_KeySubkeyToBlobID(key, subkey));
718 
719  parameters.LoadNamedParameters(optional);
720 
721  if (max_age != 0) {
722  parameters.SetMaxBlobAge(max_age);
723  }
724 
725  string cmd = MakeStdCmd("READLAST", blob_id, &parameters);
726 
728  key, false, &m_DefaultParameters));
729 
730  string::size_type pos = exec_result.response.find("VER=");
731 
732  if (pos == string::npos) {
733  NCBI_THROW(CNetCacheException, eInvalidServerResponse,
734  "No VER field in READLAST output");
735  }
736 
738  exec_result.response.c_str() + pos + sizeof("VER=") - 1,
740 
741  pos = exec_result.response.find("VALID=");
742 
743  if (pos == string::npos) {
744  NCBI_THROW(CNetCacheException, eInvalidServerResponse,
745  "No VALID field in READLAST output");
746  }
747 
748  switch (exec_result.response[pos + sizeof("VALID=") - 1]) {
749  case 't': case 'T': case 'y': case 'Y':
750  *validity = ICache::eCurrent;
751  break;
752  case 'f': case 'F': case 'n': case 'N':
753  *validity = ICache::eExpired;
754  break;
755  default:
756  NCBI_THROW(CNetCacheException, eInvalidServerResponse,
757  "Invalid format of the VALID field in READLAST output");
758  }
759 
760  if (max_age > 0)
761  *actual_age = x_ExtractBlobAge(exec_result, "READLAST");
762 
763  return new CNetCacheReader(this, blob_id, exec_result,
764  blob_size_ptr, &m_DefaultParameters);
765  } catch (CNetCacheBlobTooOldException& e) {
766  if (actual_age != NULL)
767  *actual_age = e.GetAge();
768  *version = e.GetVersion();
769 
770  return NULL;
771  }
772  catch (CNetCacheException& e) {
774  throw;
775  return NULL;
776  }
777 }
778 
780 {
781 public:
784  const string& key,
785  const string& subkey,
786  int version) :
787  m_Listener(listener),
788  m_Key(key),
789  m_Subkey(subkey),
791  {
793 
794  auto warning_handler = [&](const string& m, CNetServer s) {
795  return OnWarning(m, s);
796  };
797 
798  m_Listener->SetWarningHandler(warning_handler);
799  }
800 
802  {
803  m_Listener->SetWarningHandler(nullptr);
804  }
805 
806 private:
807  bool OnWarning(const string& warn_msg, CNetServer server);
808 
810  const string m_Key;
811  const string m_Subkey;
812  const int m_Version;
813 };
814 
816 {
817  SIZE_TYPE ver_pos = NStr::FindCase(warn_msg,
818  CTempString("VER=", sizeof("VER=") - 1));
819 
820  if (ver_pos == NPOS)
821  return false;
822  else {
823  int version = atoi(warn_msg.c_str() + ver_pos + sizeof("VER=") - 1);
824  if (version < m_Version) {
825  ERR_POST("Cache actualization error (key \"" << m_Key <<
826  "\", subkey \"" << m_Subkey <<
827  "\"): the cached blob version downgraded from " <<
828  m_Version << " to " << version);
829  }
830  }
831 
832  return true;
833 }
834 
836  const string& subkey, int version)
837 {
838  CSetValidWarningSuppressor warning_suppressor(m_Impl->m_Service->m_Listener, key, subkey, version);
839 
840  CNetServer::SExecResult exec_result(
841  m_Impl->ChooseServerAndExec(
842  m_Impl->MakeStdCmd("SETVALID",
844  &m_Impl->m_DefaultParameters),
845  key,
846  false,
847  &m_Impl->m_DefaultParameters));
848 
849  if (!exec_result.response.empty()) {
850  ERR_POST("SetBlobVersionAsCurrent(\"" << key << "\", " <<
851  version << ", \"" << subkey << "\"): " << exec_result.response);
852  }
853 }
854 
856  int version, const string& subkey,
857  const CNamedParameterList* optional)
858 {
859  CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
860 
861  parameters.LoadNamedParameters(optional);
862 
864  m_Impl->ChooseServerAndExec(
865  m_Impl->MakeStdCmd("GETMETA",
867  &parameters),
868  key,
869  true,
870  &parameters));
871 
872  output->SetNetCacheCompatMode();
873 
874  return output;
875 }
876 
878  int version, const string& subkey)
879 {
881 
882  string line;
883 
884  if (output.ReadLine(line)) {
885  if (!NStr::StartsWith(line, "SIZE="))
886  NcbiCout << line << NcbiEndl;
887  while (output.ReadLine(line))
888  NcbiCout << line << NcbiEndl;
889  }
890 }
891 
893 {
894  return m_Impl->m_Service;
895 }
896 
897 string SNetICacheClientImpl::MakeStdCmd(const char* cmd_base,
898  const string& blob_id, const CNetCacheAPIParameters* parameters,
899  const string& injection)
900 {
901  string cmd("IC(" + NStr::PrintableString(parameters->GetCacheName()));
902  cmd.append(") ");
903 
904  cmd.append(cmd_base);
905 
906  cmd.push_back(' ');
907 
908  cmd.append(blob_id);
909 
910  if (!injection.empty())
911  cmd.append(injection);
912 
914 
915  return cmd;
916 }
917 
918 string SNetICacheClientImpl::ExecStdCmd(const char* cmd_base,
919  const string& key, int version, const string& subkey,
920  const CNetCacheAPIParameters* parameters)
921 {
922  return ChooseServerAndExec(
923  MakeStdCmd(cmd_base,
925  parameters),
926  key,
927  false,
928  parameters).response;
929 }
930 
932 {
933  return m_Impl->m_DefaultParameters.GetCacheName();
934 }
935 
936 
938 {
939  return false;
940 }
941 
943 {
944  return new SNetICacheClientImpl(server->m_ServerInPool, m_Impl);
945 }
946 
947 vector<CNetICacheClient::CBlobInfo> CNetICacheClient::Search(
950 {
951  const auto parameters = &m_Impl->m_DefaultParameters;
952  const auto cache_name = NStr::PrintableString(parameters->GetCacheName());
953  string ids;
954  m_Impl->AppendClientIPSessionIDPasswordAgeHitID(&ids, parameters);
955  ostringstream oss;
956  oss << "IC(" << cache_name << ") BLIST2" << expression + filter << ids;
957 
959  m_Impl->ChooseServerAndExec(
960  oss.str(),
961  kEmptyStr,
962  true,
963  &m_Impl->m_DefaultParameters));
964 
965  output->SetNetCacheCompatMode();
966  string line;
967  vector<CNetICacheClient::CBlobInfo> result;
968 
969  while (output.ReadLine(line) && !line.empty()) {
970  CBlobInfo blob_info;
971  blob_info << line;
972  result.push_back(blob_info);
973  }
974 
975  return result;
976 }
977 
978 void CNetICacheClientExt::ProlongBlobLifetime(const string& key, const string& subkey,
979  const CTimeout& ttl, const CNamedParameterList* optional)
980 {
981  CNetCacheAPIParameters parameters(&m_Impl->m_DefaultParameters);
982  parameters.LoadNamedParameters(optional);
983 
984  string cmd("PROLONG \"");
985  cmd += NStr::PrintableString(parameters.GetCacheName());
986  cmd += "\" \"";
987  cmd += key;
988  cmd += "\" \"";
989  cmd += subkey;
990  cmd += "\" ttl=";
991  cmd += NStr::NumericToString((unsigned)ttl.GetAsDouble());
992 
993  m_Impl->AppendClientIPSessionIDHitID(&cmd);
994  m_Impl->ChooseServerAndExec(cmd, key, false, &m_Impl->m_DefaultParameters);
995 }
996 
998 {
999  return new SNetCacheAPIImpl(m_Impl.GetObject());
1000 }
1001 
1003 {
1004  return new SNetCacheAPIImpl(m_Impl.GetObject());
1005 }
1006 
1007 /// Class factory for NetCache implementation of ICache
1008 ///
1009 /// @internal
1010 ///
1011 class CNetICacheCF : public CICacheCF<CNetICacheClient>
1012 {
1013 public:
1015 
1016 public:
1018  {
1019  }
1020 
1021 private:
1022  virtual ICache* x_CreateInstance(
1023  const string& driver = kEmptyStr,
1025  const TPluginManagerParamTree* params = 0) const;
1026 };
1027 
1028 
1030  const string& driver,
1032  const TPluginManagerParamTree* params) const
1033 {
1034  if ((!driver.empty() && driver != m_DriverName) ||
1037  return 0;
1038 
1039  if (!params)
1040  return new CNetICacheClient;
1041 
1042  CConfig conf(params);
1043 
1044  return new CNetICacheClient(&conf, driver);
1045 }
1046 
1047 
1051 {
1053 }
1054 
1055 
1057 {
1058  RegisterEntryPoint<ICache>( NCBI_EntryPoint_xcache_netcache );
1059 }
1060 
Utility class for ICache class factories.
Definition: icache_cf.hpp:53
Exception thrown when the requested blob is older than the requested age.
NetCache internal exception.
Class factory for NetCache implementation of ICache.
virtual ICache * x_CreateInstance(const string &driver=kEmptyStr, CVersionInfo version=NCBI_INTERFACE_VERSION(ICache), const TPluginManagerParamTree *params=0) const
CICacheCF< CNetICacheClient > TParent
Client to NetCache server (implements ICache interface)
SExecResult ExecWithRetry(const string &cmd, bool multiline_output=false)
Execute remote command 'cmd', wait for the reply, check if it starts with 'OK:', and return the remai...
Net Service exception.
CNetServiceIterator IterateByWeight(const string &key)
CSetValidWarningSuppressor(INetServerConnectionListener *listener, const string &key, const string &subkey, int version)
CRef< INetServerConnectionListener > m_Listener
bool OnWarning(const string &warn_msg, CNetServer server)
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:65
CTimeout – Timeout interval.
Definition: ncbitime.hpp:1693
definition of a Culling tree
Definition: ncbi_tree.hpp:100
CVersionInfo –.
BLOB cache read/write/maintenance interface.
Definition: icache.hpp:64
EKeepVersions
If to keep already cached versions of the BLOB when storing another version of it (not necessarily a ...
Definition: icache.hpp:162
int TFlags
Bitwise OR of "EFlags" flags.
Definition: icache.hpp:89
EBlobVersionValidity
BLOB version existence and validity – from the point of view of the underlying cache implementation.
Definition: icache.hpp:274
@ eExpired
Validity of the BLOB version cannot be confirmed.
Definition: icache.hpp:276
@ eCurrent
The returned BLOB's version is considered valid.
Definition: icache.hpp:275
@ fBestReliability
Usually, it's not a problem if something fails to get cached sometimes.
Definition: icache.hpp:85
int TTimeStampFlags
Holds a bitwise OR of "ETimeStampFlags".
Definition: icache.hpp:128
A very basic data-read interface.
IRegistry –.
Definition: ncbireg.hpp:73
A very basic data-write interface.
NStr –.
Definition: ncbistr.hpp:243
CNetServiceIterator m_Iterator
SWeightedServiceTraversal(CNetService::TInstance service, const string &key)
virtual CNetServer BeginIteration()
virtual CNetServer NextServer()
static CMemoryRegistry registry
Definition: cn3d_tools.cpp:81
static uch flags
static CS_COMMAND * cmd
Definition: ct_dynamic.c:26
static SQLCHAR output[256]
Definition: print.c:5
int offset
Definition: replacements.h:160
char data[12]
Definition: iconv.c:80
#define NULL
Definition: ncbistd.hpp:225
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
TErrCode GetErrCode(void) const
Get error code.
Definition: ncbiexpt.cpp:453
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
#define nc_caching_mode
Caching mode.
#define nc_actual_age
A pointer to an unsigned variable where the actual age of the blob must be stored.
#define nc_blob_ttl
Blob life span in seconds.
#define nc_max_age
Do not read the blob if its age is greater than the specified value.
CNetService GetService()
virtual time_t GetAccessTime(const string &key, int version, const string &subkey)
Return last access time for the specified cache entry.
IReader * GetReadStream(const string &key, int version, const string &subkey, size_t *blob_size_ptr, const CNamedParameterList *optional=NULL)
Read a lengthy blob via the IReader interface.
ENetCacheResponseType GetResponseType() const
unsigned * GetActualBlobAgePtr() const
void SetCacheName(const string &cache_name)
const string & GetBlobID() const
virtual string GetCacheName(void) const
void RemoveBlob(const string &key, int version, const string &subkey, const CNamedParameterList *optional=NULL)
virtual void Store(const string &key, int version, const string &subkey, const void *data, size_t size, unsigned int time_to_live=0, const string &owner=kEmptyStr)
Add or replace BLOB.
virtual EKeepVersions GetVersionRetention() const
Get version retention.
virtual void SetBlobVersionAsCurrent(const string &key, const string &subkey, int version)
Set current valid version for a BLOB.
CNetServerMultilineCmdOutput GetBlobInfo(const string &key, int version, const string &subkey, const CNamedParameterList *optional=NULL)
Return a CNetServerMultilineCmdOutput object for reading meta information about the specified blob.
virtual void Remove(const string &key, int version, const string &subkey)
Remove specific cache entry.
virtual void SetVersionRetention(EKeepVersions policy)
Set version retention policy.
virtual TTimeStampFlags GetTimeStampPolicy() const
Get timestamp policy.
virtual TFlags GetFlags()
Retrieve the effective combination of flags from the underlying storage.
bool GetServerCheck(ESwitch *server_check) const
grid::netcache::search::CBlobInfo CBlobInfo
void Cache_RegisterDriver_NetCache(void)
grid::netcache::search::CExpression CExpression
void RegisterSession(unsigned pid)
Send session registration command.
IReader * GetReadStreamPart(const string &key, int version, const string &subkey, size_t offset, size_t part_size, size_t *blob_size_ptr, const CNamedParameterList *optional=NULL)
Read data from the specified blob.
CNetServer * GetServerLastUsedPtr() const
STimeout GetCommunicationTimeout() const
bool ReadPart(const string &key, int version, const string &subkey, size_t offset, size_t part_size, void *buf, size_t buf_size)
void SetMaxBlobAge(unsigned max_age)
size_t GetBlobSize(const string &key, int version, const string &subkey, const CNamedParameterList *optional=NULL)
Returns the size of the BLOB identified by the "key", "version", and "subkey" parameters.
void SetTTL(unsigned blob_ttl)
void UnRegisterSession(unsigned pid)
Send session unregistration command.
virtual bool Read(const string &key, int version, const string &subkey, void *buf, size_t buf_size)
void LoadNamedParameters(const CNamedParameterList *optional)
size_t CheckBlobSize(Uint8 blob_size)
Definition: netcache_rw.hpp:58
CNetRef< SNetICacheClientImpl > m_Impl
const char *const kNetICacheDriverName
vector< CBlobInfo > Search(CExpression expression, CFields fields=CFields())
Returns information for all blobs matching provided search expression.
void WriteBufferAndClose(const char *buf_ptr, size_t buf_size)
virtual int GetTimeout() const
Get expiration timeout.
EAppRegistry
Defines how this object must be initialized.
bool HasBlob(const string &key, const string &subkey, const CNamedParameterList *optional=NULL)
CNetServer GetServerToUse() const
void NCBI_EntryPoint_xcache_netcache(CPluginManager< ICache >::TDriverInfoList &info_list, CPluginManager< ICache >::EEntryPointRequest method)
virtual bool HasBlobs(const string &key, const string &subkey)
Check if any BLOB exists (any version)
IEmbeddedStreamWriter * GetNetCacheWriter(const string &key, int version, const string &subkey, const CNamedParameterList *optional=NULL)
Create or update the specified blob.
grid::netcache::search::CFields CFields
virtual IWriter * GetWriteStream(const string &key, int version, const string &subkey, unsigned int time_to_live=0, const string &owner=kEmptyStr)
Return sequential stream interface to write BLOB data.
unsigned GetMaxBlobAge() const
void PrintBlobInfo(const string &key, int version, const string &subkey)
Print meta information about the specified blob.
virtual void GetBlobOwner(const string &key, int version, const string &subkey, string *owner)
Retrieve BLOB owner.
virtual void SetFlags(TFlags flags)
Pass flags to the underlying storage.
void SetTryAllServers(bool try_all_servers)
virtual void GetBlobAccess(const string &key, int version, const string &subkey, SBlobAccessDescr *blob_descr)
Get BLOB access using BlobAccessDescr.
virtual bool SameCacheParams(const TCacheParams *params) const
virtual size_t GetSize(const string &key, int version, const string &subkey)
Check if BLOB exists, return BLOB size.
virtual void Purge(time_t access_timeout)
Delete all BLOBs older than specified.
virtual bool IsOpen() const
void SetCommunicationTimeout(const STimeout &to)
Set communication timeout.
virtual void SetTimeStampPolicy(TTimeStampFlags policy, unsigned int timeout, unsigned int max_timeout=0)
Set timestamp update policy.
const string & GetKey() const
void SetCachingMode(CNetCacheAPI::ECachingMode caching_mode)
@ eICache_NoWait
@ eNetCache_Wait
@ eReadComplete
The whole BLOB has been read.
@ eBlobNotFound
Access denied.
static void NCBI_EntryPointImpl(TDriverInfoList &info_list, EEntryPointRequest method)
Entry point implementation.
#define NCBI_INTERFACE_VERSION(iface)
Macro to construct CVersionInfo class using interface name (relies on CInterfaceVersion class)
list< SDriverInfo > TDriverInfoList
List of driver information.
EEntryPointRequest
Actions performed by the entry point.
uint64_t Uint8
8-byte (64-bit) unsigned integer
Definition: ncbitype.h:105
virtual const string & Get(const string &section, const string &name, TFlags flags=0) const
Get the parameter value.
Definition: ncbireg.cpp:262
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
#define NcbiEndl
Definition: ncbistre.hpp:548
#define NcbiCout
Definition: ncbistre.hpp:543
NCBI_NS_STD::string::size_type SIZE_TYPE
Definition: ncbistr.hpp:132
static string PrintableString(const CTempString str, TPrintableMode mode=fNewLine_Quote|fNonAscii_Passthru)
Get a printable version of the specified string.
Definition: ncbistr.cpp:3944
#define kEmptyStr
Definition: ncbistr.hpp:123
#define NPOS
Definition: ncbistr.hpp:133
static string IntToString(int value, TNumToStringFlags flags=0, int base=10)
Convert int to string.
Definition: ncbistr.hpp:5078
static SIZE_TYPE FindCase(const CTempString str, const CTempString pattern, SIZE_TYPE start, SIZE_TYPE end, EOccurrence which=eFirst)
Find the pattern in the specified range of a string using a case sensitive search.
Definition: ncbistr.hpp:5484
static string UIntToString(unsigned int value, TNumToStringFlags flags=0, int base=10)
Convert UInt to string.
Definition: ncbistr.hpp:5103
static bool StartsWith(const CTempString str, const CTempString start, ECase use_case=eCase)
Check if a string starts with a specified prefix value.
Definition: ncbistr.hpp:5406
static Uint8 StringToUInt8(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to Uint8.
Definition: ncbistr.cpp:871
static unsigned int StringToUInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to unsigned int.
Definition: ncbistr.cpp:642
static enable_if< is_arithmetic< TNumeric >::value||is_convertible< TNumeric, Int8 >::value, string >::type NumericToString(TNumeric value, TNumToStringFlags flags=0, int base=10)
Convert numeric value to string.
Definition: ncbistr.hpp:673
static string UInt8ToString(Uint8 value, TNumToStringFlags flags=0, int base=10)
Convert UInt8 to string.
Definition: ncbistr.hpp:5162
@ fAllowTrailingSymbols
Ignore trailing non-numerics characters.
Definition: ncbistr.hpp:298
double GetAsDouble(void) const
Get as number of seconds (fractional value).
Definition: ncbitime.cpp:3512
enum ENcbiSwitch ESwitch
Aux.
@ eNonCompatible
major, minor does not match
unsigned int
A callback function used to compare two keys in a database.
Definition: types.hpp:1210
Definition of all error codes used in connect services library (xconnserv.lib and others).
char * buf
const string version
version string
Definition: variables.hpp:66
const struct ncbi::grid::netcache::search::fields::SIZE size
const struct ncbi::grid::netcache::search::fields::KEY key
const struct ncbi::grid::netcache::search::fields::SUBKEY subkey
Static variables safety - create on demand, destroy on application termination.
Defines: CTimeFormat - storage class for time format.
NetCache API exception declarations.
#define MAX_ICACHE_CACHE_NAME_LENGTH
#define MAX_ICACHE_SUBKEY_LENGTH
static string s_KeyVersionSubkeyToBlobID(const string &key, int version, const string &subkey)
static string s_KeySubkeyToBlobID(const string &key, const string &subkey)
#define MAX_ICACHE_KEY_LENGTH
static const char s_NetICacheAPIName[]
static string s_CheckKeySubkey(const string &key, const string &subkey, string *encoded_key)
NetCache ICache client specs.
Helper classes and templates to implement plugins.
Defines CRequestContext class for NCBI C++ diagnostic API.
IReader * GetReadStream(const string &key, int version, const string &subkey, size_t *blob_size_ptr, const CNamedParameterList *optional=NULL)
Read a lengthy blob via the IReader interface.
void ProlongBlobLifetime(const string &key, const string &subkey, const CTimeout &ttl, const CNamedParameterList *optional=NULL)
SNetCacheAPIImpl * GetNetCacheAPI()
CNetICacheClientExt GetServer(CNetServer::TInstance server)
CNetServerConnection conn
BLOB access descriptor.
Definition: icache.hpp:332
unsigned maximum_age
Set to a non-zero value to return a version not older than the specified value.
Definition: icache.hpp:355
unsigned actual_age
If `maximum_age` is not zero, GetBlobAccess() will set this field to the actual blob version age upon...
Definition: icache.hpp:359
unique_ptr< IReader > reader
Definition: icache.hpp:347
EBlobVersionValidity current_version_validity
If `return_current_version` is set, the `current_version_validity` field will contain the validity of...
Definition: icache.hpp:376
bool return_current_version_supported
If TRUE, the ICache instance supports return_current_version.
Definition: icache.hpp:362
TBlobVersion current_version
If `return_current_version` is set, the current version number of the blob is stored in the `current_...
Definition: icache.hpp:372
bool return_current_version
If TRUE, the `version` argument of GetBlobAccess() will be ignored and the `current_version` and `cur...
Definition: icache.hpp:368
void SetWarningHandler(TEventHandler warning_handler)
void AppendClientIPSessionIDPasswordAgeHitID(string *cmd, const CNetCacheAPIParameters *parameters)
CNetCacheAPIParameters m_DefaultParameters
unsigned x_ExtractBlobAge(const CNetServer::SExecResult &exec_result, const char *cmd_name)
static CNetCacheAPI::EReadResult ReadBuffer(IReader &reader, char *buf_ptr, size_t buf_size, size_t *n_read, size_t blob_size)
void Init(CSynRegistry &registry, const SRegSynonyms &sections)
CNetServer::SExecResult ChooseServerAndExec(const string &cmd, const string &key, bool multiline_output, const CNetCacheAPIParameters *parameters)
void Init(CSynRegistry &registry, const SRegSynonyms &sections)
IReader * ReadCurrentBlobNotOlderThan(const string &key, const string &subkey, size_t *blob_size_ptr, int *version, ICache::EBlobVersionValidity *validity, unsigned max_age, unsigned *actual_age, const CNamedParameterList *optional=NULL)
ICache::TFlags m_CacheFlags
string MakeStdCmd(const char *cmd_base, const string &blob_id, const CNetCacheAPIParameters *parameters, const string &injection=kEmptyStr)
SNetICacheClientImpl(CSynRegistryBuilder registry_builder, const string &section, const string &service_name, const string &client_name, const string &cache_name)
IReader * GetReadStreamPart(const string &key, int version, const string &subkey, size_t offset, size_t part_size, size_t *blob_size_ptr, const CNamedParameterList *optional)
string ExecStdCmd(const char *cmd_base, const string &key, int version, const string &subkey, const CNetCacheAPIParameters *parameters)
SNetICacheClientImpl(SNetServerInPool *server, SNetICacheClientImpl *parent)
virtual CNetServerConnection InitiateWriteCmd(CNetCacheWriter *nc_writer, const CNetCacheAPIParameters *parameters)
CRef< SNetServerInPool > m_ServerInPool
static SNetServiceImpl * Create(const string &api_name, const string &service_name, const string &client_name, INetServerConnectionListener *listener, CSynRegistry &registry, SRegSynonyms &sections, const string &ns_client_name=kEmptyStr)
void IterateUntilExecOK(const string &cmd, bool multiline_output, CNetServer::SExecResult &exec_result, IServiceTraversal *service_traversal, EServerErrorHandling error_handling)
Timeout structure.
Definition: ncbi_types.h:76
#define _ASSERT
else result
Definition: token2.c:20
Reader writer with transmission checking.
Modified on Fri Sep 20 14:58:14 2024 by modify_doxy.py rev. 669887