NCBI C++ ToolKit
blob_storage.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: blob_storage.cpp 101703 2024-01-29 15:34:46Z saprykin $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description:
29  *
30  * The functionality not directly related to blob operations
31  *
32  */
33 
34 #include <ncbi_pch.hpp>
35 
37 #include <connect/ncbi_socket.hpp>
38 
42 
44 
45 const char* const SBlobStorageConstants::kChunkTableDefault = "blob_chunk";
46 const char* const SBlobStorageConstants::kChunkTableBig = "big_blob_chunk";
47 
49 
51 constexpr int kSatInfoReadRetry{5};
52 
53 bool CanRetry(CCassandraException const& e, int retries)
54 {
55  return
56  (
59  )
60  && retries > 0;
61 }
62 
63 vector<SSatInfoEntry>
65  string const& keyspace, string const& domain,
66  shared_ptr<CCassConnection> connection,
67  optional<chrono::milliseconds> timeout
68 ) {
69  vector<SSatInfoEntry> result;
70  for (int i = kSatInfoReadRetry; i >= 0; --i) {
71  try {
72  auto query = connection->NewQuery();
73  if (timeout) {
74  query->UsePerRequestTimeout(true);
75  query->SetTimeout(timeout.value().count());
76  }
77  query->SetSQL(
78  "SELECT sat, keyspace_name, schema_type, service, flags FROM "
79  + keyspace + ".sat2keyspace WHERE domain = ?", 1);
80  query->BindStr(0, domain);
81  query->Query(kSatInfoReadConsistency, false, false);
82  while (query->NextRow() == ar_dataready) {
84  row.sat = query->FieldGetInt32Value(0);
85  row.keyspace = query->FieldGetStrValue(1);
86  row.schema_type = static_cast<ECassSchemaType>(query->FieldGetInt32Value(2));
87  row.service = query->FieldGetStrValueDef(3, "");
88  row.flags = query->FieldGetInt64Value(4, 0);
89  if (row.schema_type <= eUnknownSchema || row.schema_type > eMaxSchema) {
90  // ignoring
91  }
92  else {
93  result.push_back(row);
94  }
95  }
96  }
97  catch (CCassandraException const& e) {
98  if (!CanRetry(e, i)) {
99  throw;
100  }
101  }
102  break;
103  }
104 
105  sort(begin(result), end(result),
106  [](SSatInfoEntry const& a, SSatInfoEntry const& b)
107  {
108  return a.sat < b.sat;
109  }
110  );
111 
112  return result;
113 }
114 
115 shared_ptr<CPSGMessages>
117  string const& keyspace, string const& domain,
118  shared_ptr<CCassConnection> connection,
119  optional<chrono::milliseconds> timeout
120 ) {
121  auto result = make_shared<CPSGMessages>();
122  for (int i = kSatInfoReadRetry; i >= 0; --i) {
123  try {
124  result->Clear();
125  auto query = connection->NewQuery();
126  if (timeout) {
127  query->UsePerRequestTimeout(true);
128  query->SetTimeout(timeout.value().count());
129  }
130  query->SetSQL("SELECT name, value FROM " + keyspace + ".messages WHERE domain = ?", 1);
131  query->BindStr(0, domain);
132  query->Query(kSatInfoReadConsistency, false, false);
133  while (query->NextRow() == ar_dataready) {
134  result->Set(
135  query->FieldGetStrValue(0),
136  query->FieldGetStrValueDef(1, "")
137  );
138  }
139  break;
140  }
141  catch (CCassandraException const& e) {
142  if (!CanRetry(e, i)) {
143  throw;
144  }
145  }
146  }
147  return result;
148 }
149 
151  string const& keyspace, int32_t sat,
152  shared_ptr<CCassConnection> connection,
153  optional<chrono::milliseconds> timeout
154 ) {
156  for (int i = kSatInfoReadRetry; i >= 0; --i) {
157  try {
158  result.clear();
159  auto query = connection->NewQuery();
160  if (timeout) {
161  query->UsePerRequestTimeout(true);
162  query->SetTimeout(timeout.value().count());
163  }
164  query->SetSQL("SELECT username FROM " + keyspace + ".web_user WHERE sat = ?", 1);
165  query->BindInt32(0, sat);
166  query->Query(kSatInfoReadConsistency, false, false);
167  while (query->NextRow() == ar_dataready) {
168  auto username = query->FieldGetStrValueDef(0, "");
169  if (!username.empty()) {
170  result.insert(username);
171  }
172  }
173  break;
174  }
175  catch (CCassandraException const& e) {
176  if (!CanRetry(e, i)) {
177  throw;
178  }
179  }
180  }
181  return result;
182 }
183 
184 string GetAddressString(string const& host, bool is_host)
185 {
186  if (is_host && !CSocketAPI::isip(host, false)) {
187  auto addr = CSocketAPI::gethostbyname(host);
188  if (addr == 0) {
189  return "";
190  }
191  return CSocketAPI::HostPortToString(addr, 0);
192  }
193  return host;
194 }
195 
196 template <class T>
197 inline void hash_combine(size_t& seed, const T& v)
198 {
199  std::hash<T> hasher;
200  seed ^= hasher(v) + 0x9e3779b9 + (seed<<6) + (seed>>2);
201 }
202 
204  vector<SSatInfoEntry> const& rows,
205  string const& secure_registry_section,
206  map<int32_t, set<string>> secure_users
207 )
208 {
209  size_t result{0};
210  hash_combine(result, secure_registry_section);
211  for (auto const& row : rows) {
212  hash_combine(result, row.sat);
213  hash_combine(result, row.keyspace);
214  hash_combine(result, row.schema_type);
215  hash_combine(result, row.service);
216  hash_combine(result, row.flags);
217  if (row.IsSecureSat()) {
218  for (auto user: secure_users[row.sat]) {
219  hash_combine(result, user);
220  }
221  }
222  }
223  return result;
224 }
225 
226 
227 shared_ptr<CCassConnection>
229  shared_ptr<IRegistry const> const& registry,
230  string const& section,
231  string const& service,
232  bool reset_namespace
233 )
234 {
235  auto factory = CCassConnectionFactory::s_Create();
236  factory->LoadConfig(registry.get(), section);
237  if (!service.empty()) {
238  factory->SetServiceName(service);
239  }
240  if (reset_namespace) {
241  factory->SetDataNamespace("");
242  }
243  auto connection = factory->CreateInstance();
244  connection->Connect();
245  return connection;
246 }
247 
248 inline string GetServiceKey(string const& service, string const& registry_section)
249 {
250  return registry_section + "|" + service;
251 }
252 
253 inline string GetConnectionPointKey(string const& peer, int16_t port, string const& registry_section)
254 {
255  return registry_section + "|" + peer + ":" + to_string(port);
256 }
257 
258 END_SCOPE()
259 
260 optional<SSatInfoEntry> CSatInfoSchema::GetBlobKeyspace(int32_t sat) const
261 {
262  auto itr = m_BlobKeyspaces.find(sat);
263  if (
264  itr != cend(m_BlobKeyspaces)
265  && (
266  itr->second.schema_type == eBlobVer2Schema
267  || itr->second.schema_type == eNamedAnnotationsSchema
268  )
269  ) {
270  return itr->second;
271  }
272  return {};
273 }
274 
276 {
277  auto itr = crbegin(m_BlobKeyspaces);
278  return itr == crend(m_BlobKeyspaces) ? -1 : itr->first;
279 }
280 
281 vector<SSatInfoEntry> CSatInfoSchema::GetNAKeyspaces() const
282 {
283  return m_BioseqNaKeyspaces;
284 }
285 
287 {
288  return m_ResolverKeyspace;
289 }
290 
291 optional<SSatInfoEntry> CSatInfoSchema::GetIPGKeyspace() const
292 {
293  return m_IPGKeyspace;
294 }
295 
296 shared_ptr<CCassConnection> CSatInfoSchema::x_GetConnectionByService(string const& service, string const& registry_section) const
297 {
298  auto itr = m_Service2Cluster.find(GetServiceKey(service, registry_section));
299  return itr == cend(m_Service2Cluster) ? nullptr : itr->second;
300 }
301 
302 shared_ptr<CCassConnection> CSatInfoSchema::x_GetConnectionByConnectionPoint(string const& connection_point) const
303 {
304  auto itr = m_Point2Cluster.find(connection_point);
305  return itr == cend(m_Point2Cluster) ? nullptr : itr->second;
306 }
307 
308 optional<ESatInfoRefreshSchemaResult> CSatInfoSchema::x_AddConnection(
309  shared_ptr<CCassConnection> const& connection,
310  string const& registry_section,
311  string const& service,
312  bool is_default
313 )
314 {
315  unsigned int timeout = m_ResolveTimeout ? m_ResolveTimeout.value().count() : 0;
316  for (auto peer : connection->GetLocalPeersAddressList("", timeout)) {
317  m_Point2Cluster[GetConnectionPointKey(peer, connection->GetPort(), registry_section)] = connection;
318  }
319  if (is_default) {
320  m_DefaultConnection = connection;
321  m_DefaultRegistrySection = registry_section;
322  }
323  else {
324  m_Service2Cluster[GetServiceKey(service, registry_section)] = connection;
325  }
326  return {};
327 }
328 
329 optional<ESatInfoRefreshSchemaResult> CSatInfoSchema::x_ResolveServiceName(
330  string const& service, string const& registry_section, vector<string>& connection_points)
331 {
332  connection_points.clear();
333  {
334  class CInPlaceConnIniter : protected CConnIniter
335  {} conn_initer; /*NCBI_FAKE_WARNING*/
336  }
337 
338  bool is_hostlist = (service.find(':') != string::npos)
339  || (service.find(' ') != string::npos)
340  || (service.find(',') != string::npos);
341 
342  string hosts;
343  if (!is_hostlist) {
344  ERR_POST(Info << "CSatInfoSchema::x_AddClusterByServiceName uses service name: '" << service << "'");
345  hosts = LbsmLookup::s_Resolve(service, ',');
346  if (hosts.empty()) {
347  ERR_POST(Info << "CSatInfoSchema::x_AddClusterByServiceName failed to resolve LBSM service name: '" << service << "'");
349  }
350  ERR_POST(Info << "CSatInfoSchema::x_AddClusterByServiceName resolved service name: '" << service << "' => '" << hosts << "'");
351  }
352  else {
353  ERR_POST(Info << "CSatInfoSchema::x_AddClusterByServiceName uses host list: '" << service << "'");
354  hosts = service;
355  }
356 
357  vector<string> items;
359  for (auto item : items) {
360  string item_host;
361  string item_port_token;
362  if (NStr::SplitInTwo(item, ":", item_host, item_port_token)) {
363  int16_t item_port = NStr::StringToNumeric<short>(item_port_token, NStr::fConvErr_NoThrow);
364  item_port = item_port ? item_port : CCassConnection::kCassDefaultPort;
365  item_host = GetAddressString(item_host, is_hostlist);
366  if (item_host.empty()) {
368  }
369  connection_points.push_back(GetConnectionPointKey(item_host, item_port, registry_section));
370  }
371  else {
372  item = GetAddressString(item, is_hostlist);
373  if (item.empty()) {
375  }
376  connection_points.push_back(
377  GetConnectionPointKey(item_host, CCassConnection::kCassDefaultPort, registry_section)
378  );
379  }
380  }
381  return {};
382 }
383 
384 optional<ESatInfoRefreshSchemaResult> CSatInfoSchema::x_ResolveConnectionByServiceName(
385  string service,
386  shared_ptr<CSatInfoSchema> const& old_schema,
387  shared_ptr<IRegistry const> const& registry,
388  string const& registry_section,
389  shared_ptr<CCassConnection>& connection
390 )
391 {
392  // Check this schema data
393  if (service.empty()) {
394  if (registry_section == m_DefaultRegistrySection) {
395  connection = m_DefaultConnection;
396  return {};
397  }
398  else {
399  service = registry->Get(registry_section, "service");
400  }
401  }
402  if (service.empty() && registry_section == m_DefaultRegistrySection) {
403  connection = m_DefaultConnection;
404 
405  }
406  connection = x_GetConnectionByService(service, registry_section);
407  if (connection) {
408  return {};
409  }
410  vector<string> connection_points;
411  auto result = x_ResolveServiceName(service, registry_section, connection_points);
412  if (result.has_value()) {
413  return result;
414  }
415  for (auto const& connection_point : connection_points) {
416  connection = x_GetConnectionByConnectionPoint(connection_point);
417  if (connection) {
418  m_Service2Cluster.emplace(GetServiceKey(service, registry_section), connection);
419  return {};
420  }
421  }
422 
423  // Check previous schema version
424  if (old_schema) {
425  connection = old_schema->x_GetConnectionByService(service, registry_section);
426  if (connection) {
427  x_AddConnection(connection, registry_section, service, false);
428  return {};
429  }
430  for (auto const& connection_point : connection_points) {
431  connection = old_schema->x_GetConnectionByConnectionPoint(connection_point);
432  if (connection) {
433  x_AddConnection(connection, registry_section, service, false);
434  return {};
435  }
436  }
437  }
438 
439  // Make NEW connection
440  {
441  connection = MakeCassConnection(registry, registry_section, service, true);
442  x_AddConnection(connection, registry_section, service, false);
443  }
444 
445  return {};
446 }
447 
448 optional<ESatInfoRefreshSchemaResult> CSatInfoSchema::x_AddSatInfoEntry(
449  SSatInfoEntry entry,
450  shared_ptr<CSatInfoSchema> const& old_schema,
451  shared_ptr<IRegistry const> const& registry,
452  string const& registry_section,
453  set<string> const& secure_users
454 )
455 {
456  shared_ptr<CCassConnection> connection;
457  auto result = x_ResolveConnectionByServiceName(entry.service, old_schema, registry, registry_section, connection);
458  if (result.has_value()) {
459  return result;
460  }
461  if (!secure_users.empty()) {
462  entry.m_SecureUsers = secure_users;
463  m_SecureSatUsers[entry.sat] = secure_users;
464  }
465  if (entry.IsSecureSat()) {
466  entry.connection = nullptr;
467  entry.m_SecureConnection = move(connection);
468  }
469  else {
470  entry.connection = move(connection);
471  entry.m_SecureConnection = nullptr;
472  }
473 
474  // Temporary restriction. Until PSG needs/supports those types of secure satellites
475  if (entry.IsSecureSat() && entry.schema_type != eBlobVer2Schema)
476  {
478  }
479  switch(entry.schema_type) {
480  case eResolverSchema: {
481  if (!m_ResolverKeyspace.keyspace.empty()) {
483  }
484  m_ResolverKeyspace = move(entry);
485  break;
486  }
488  m_BlobKeyspaces.emplace(entry.sat, entry);
489  m_BioseqNaKeyspaces.push_back(move(entry));
490  break;
491  }
492  case eBlobVer1Schema:
493  case eBlobVer2Schema:
494  {
495  m_BlobKeyspaces.emplace(entry.sat, move(entry));
496  break;
497  }
498  case eIPGSchema:
499  {
500  m_IPGKeyspace = make_optional(move(entry));
501  break;
502  }
503  case eUnknownSchema: // LCOV_EXCL_LINE
504  break; // LCOV_EXCL_LINE
505  }
506  return {};
507 }
508 
509 string SSatInfoEntry::ToString(string const& prefix) const
510 {
511  stringstream s;
512  for (auto user : m_SecureUsers) {
513  s << "\n" << prefix << " - '" << user << "'";
514  }
515 
516  string secure_users = s.str();
517  secure_users = secure_users.empty() ? "{}" : secure_users;
518  return prefix + "sat: " + to_string(sat)
519  + ", schema: "+ to_string(static_cast<int>(schema_type))
520  + ", keyspace: '" + keyspace + "'"
521  + ", service: '" + service + "'"
522  + ", flags: " + to_string(flags)
523  + ", connection: &" + to_string(reinterpret_cast<intptr_t>(connection.get()))
524  + ", secure_connection: &" + to_string(reinterpret_cast<intptr_t>(m_SecureConnection.get()))
525  + ", secure_users: " + secure_users;
526 }
527 
528 shared_ptr<CCassConnection> SSatInfoEntry::GetSecureConnection(string const& username) const
529 {
530  if (!IsSecureSat()) {
531  NCBI_THROW(CCassandraException, eGeneric, "SSatInfoEntry::GetSecureConnection() should not be called for NON secure satellites");
532  }
533  if (username.empty() || m_SecureUsers.empty()) {
534  return nullptr;
535  }
536  auto user_entry = m_SecureUsers.find(username);
537  bool user_allowed_to_read = user_entry != cend(m_SecureUsers);
538  if (user_allowed_to_read) {
539  return m_SecureConnection;
540  }
541  else {
542  return nullptr;
543  }
544 }
545 
546 shared_ptr<CCassConnection> SSatInfoEntry::GetConnection() const
547 {
548  if (IsSecureSat()) {
549  NCBI_THROW(CCassandraException, eGeneric, "SSatInfoEntry::GetConnection() should not be called for secure satellites");
550  }
551  return this->connection;
552 }
553 
555 {
556  stringstream s;
557  s << "Blob keyspaces: \n";
558  for (auto sat : m_BlobKeyspaces) {
559  s << sat.second.ToString(" ") << "\n";
560  }
561  s << "BioseqNA keyspaces: \n";
562  for (auto sat : m_BioseqNaKeyspaces) {
563  s << sat.ToString(" ") << "\n";
564  }
565  s << "Resolver keyspace: \n";
566  s << m_ResolverKeyspace.ToString(" ") << "\n";
567  s << "IPG keyspace: \n";
568  s << ((m_IPGKeyspace) ? m_IPGKeyspace->ToString(" ") : " None") << "\n";
569  s << "Secure sat users: \n";
570  for (auto sat_users : m_SecureSatUsers) {
571  s << " " << sat_users.first << "\n";
572  for (auto user : sat_users.second) {
573  s << " - '" << user << "'\n";
574  }
575  }
576  s << "Default connection: &" << to_string(reinterpret_cast<intptr_t>(m_DefaultConnection.get())) << "\n";
577  s << "Default registry section: '" << m_DefaultRegistrySection << "'\n";
578  s << "m_Point2Cluster: \n";
579  for (auto item : m_Point2Cluster) {
580  s << " " << item.first << " : &" << to_string(reinterpret_cast<intptr_t>(item.second.get())) << "\n";
581  }
582  s << "m_Service2Cluster: \n";
583  for (auto item : m_Service2Cluster) {
584  s << " " << item.first << " : &" << to_string(reinterpret_cast<intptr_t>(item.second.get())) << "\n";
585  }
586  return s.str();
587 }
588 
590  string const& sat_info_keyspace,
591  string const& domain,
592  shared_ptr<CCassConnection> sat_info_connection,
593  shared_ptr<IRegistry const> registry,
594  string const& registry_section
595 )
596  : m_SatInfoKeyspace(sat_info_keyspace)
597  , m_Domain(domain)
598  , m_SatInfoConnection(move(sat_info_connection))
599  , m_Registry(move(registry))
600  , m_RegistrySection(registry_section)
601 {
602  if (m_SatInfoConnection == nullptr) {
603  NCBI_THROW(CCassandraException, eFatal, "CSatInfoSchemaProvider() Cassandra connection should not be nullptr");
604  }
605 }
606 
607 void CSatInfoSchemaProvider::SetSatInfoConnection(shared_ptr<CCassConnection> sat_info_connection)
608 {
609  atomic_store(&m_SatInfoConnection, move(sat_info_connection));
610 }
611 
612 shared_ptr<CCassConnection> CSatInfoSchemaProvider::x_GetSatInfoConnection() const
613 {
614  return atomic_load(&m_SatInfoConnection);
615 }
616 
617 optional<SSatInfoEntry> CSatInfoSchemaProvider::GetBlobKeyspace(int32_t sat) const
618 {
619  auto p = GetSchema();
620  return p ? p->GetBlobKeyspace(sat) : nullopt;
621 }
622 
623 vector<SSatInfoEntry> CSatInfoSchemaProvider::GetNAKeyspaces() const
624 {
625  auto p = GetSchema();
626  return p ? p->GetNAKeyspaces() : vector<SSatInfoEntry>();
627 }
628 
630 {
631  auto p = GetSchema();
632  return p ? p->GetResolverKeyspace() : SSatInfoEntry();
633 }
634 
635 optional<SSatInfoEntry> CSatInfoSchemaProvider::GetIPGKeyspace() const
636 {
637  auto p = GetSchema();
638  return p ? p->GetIPGKeyspace() : nullopt;
639 }
640 
642 {
643  auto p = GetSchema();
644  return p ? p->GetMaxBlobKeyspaceSat() : -1;
645 }
646 
647 string CSatInfoSchemaProvider::GetMessage(string const& name) const
648 {
649  auto p = GetMessages();
650  return p ? p->Get(name) : "";
651 }
652 
653 shared_ptr<CSatInfoSchema> CSatInfoSchemaProvider::GetSchema() const
654 {
655  return atomic_load(&m_SatInfoSchema);
656 }
657 
658 shared_ptr<CPSGMessages> CSatInfoSchemaProvider::GetMessages() const
659 {
660  return atomic_load(&m_SatInfoMessages);
661 }
662 
664 {
665  if (m_SatInfoKeyspace.empty()) {
666  x_SetRefreshErrorMessage("mapping_keyspace is not specified");
668  }
670  if (rows.empty()) {
671  x_SetRefreshErrorMessage(m_SatInfoKeyspace + ".sat2keyspace info is empty");
673  }
674  map<int32_t, set<string>> secure_users;
675  if (!m_SecureSatRegistrySection.empty()) {
676  auto secure_connection = MakeCassConnection(m_Registry, m_SecureSatRegistrySection, "", false);
677  for (auto sat_info: rows) {
678  if (sat_info.IsSecureSat()) {
679  secure_users[sat_info.sat] = ReadSecureSatUsers(
680  secure_connection->Keyspace(), sat_info.sat, secure_connection, m_Timeout
681  );
682  }
683  }
684  }
685  auto rows_hash = HashSatInfoData(rows, m_SecureSatRegistrySection, secure_users);
686  if (rows_hash == m_SatInfoHash) {
688  }
689  else if (!apply) {
691  }
692  auto schema = make_shared<CSatInfoSchema>();
693  schema->m_ResolveTimeout = m_Timeout;
694  auto old_schema = GetSchema();
695  auto result = x_PopulateNewSchema(schema, old_schema, move(rows), move(secure_users));
696  if (result.has_value()) {
697  return result.value();
698  }
699  atomic_store(&m_SatInfoSchema, move(schema));
700  m_SatInfoHash = rows_hash;
702 }
703 
704 optional<ESatInfoRefreshSchemaResult> CSatInfoSchemaProvider::x_PopulateNewSchema(
705  shared_ptr<CSatInfoSchema>& new_schema,
706  shared_ptr<CSatInfoSchema> const& old_schema,
707  vector<SSatInfoEntry>&& sat_info,
708  map<int32_t, set<string>>&& secure_users
709 )
710 {
711  auto result = new_schema->x_AddConnection(x_GetSatInfoConnection(), m_RegistrySection, "", true);
712  if (result.has_value()) {
713  return result.value();
714  }
715  for (auto& entry : sat_info) {
716  string registry_section = m_RegistrySection;
717  if (entry.IsSecureSat() && !m_SecureSatRegistrySection.empty()) {
718  registry_section = m_SecureSatRegistrySection;
719  }
720  auto sat_secure_users = secure_users[entry.sat];
721  auto result = new_schema->x_AddSatInfoEntry(entry, old_schema, m_Registry, registry_section, sat_secure_users);
722  if (result.has_value()) {
723  switch(result.value()) {
725  x_SetRefreshErrorMessage("More than one resolver keyspace in the " +
726  m_SatInfoKeyspace + ".sat2keyspace table");
727  break;
729  x_SetRefreshErrorMessage("Cannot resolve service name: '" + entry.service + "'");
730  break;
732  x_SetRefreshErrorMessage("Satellite with id (" + to_string(entry.sat) + ") cannot be configured as secure");
733  break;
734  default:
735  x_SetRefreshErrorMessage("Unexpected result for SatInfoEntry processing: "
736  + to_string(static_cast<int64_t>(result.value())));
737  }
738  return result.value();
739  }
740  }
741  if (
743  (new_schema->m_ResolverKeyspace.keyspace.empty() || !new_schema->m_ResolverKeyspace.connection)
744  ) {
745  x_SetRefreshErrorMessage("resolver schema is not found in sat2keyspace");
747  }
748  if (new_schema->GetMaxBlobKeyspaceSat() == -1) {
749  x_SetRefreshErrorMessage("sat2keyspace is incomplete");
751  }
752  return {};
753 }
754 
756 {
757  if (m_SatInfoKeyspace.empty()) {
758  x_SetRefreshErrorMessage("mapping_keyspace is not specified");
760  }
762  if (messages->IsEmpty()) {
763  x_SetRefreshErrorMessage(m_SatInfoKeyspace + ".messages info is empty");
765  }
766 
767  auto old_messages = GetMessages();
768  if (old_messages && *old_messages == *messages) {
770  }
771  else if (!apply) {
773  }
774  atomic_store(&m_SatInfoMessages, move(messages));
776 }
777 
779 {
780  auto p = atomic_load(&m_RefreshErrorMessage);
781  return p ? *p : "";
782 }
783 
785 {
786  auto msg = make_shared<string>(message);
787  atomic_store(&m_RefreshErrorMessage, move(msg));
788 }
789 
790 
#define END_IDBLOB_SCOPE
Definition: IdCassScope.hpp:40
#define BEGIN_IDBLOB_SCOPE
Definition: IdCassScope.hpp:39
CassConsistency TCassConsistency
Definition: cass_driver.hpp:98
@ ar_dataready
Definition: cass_driver.hpp:70
static shared_ptr< CCassConnectionFactory > s_Create()
static constexpr int16_t kCassDefaultPort
static constexpr TCassConsistency kLocalQuorum
Helper hook-up class that installs default logging/registry/locking (but only if they have not yet be...
shared_ptr< CPSGMessages > m_SatInfoMessages
ESatInfoRefreshMessagesResult RefreshMessages(bool apply)
Refresh information for messages database {sat_info3.messages}.
optional< SSatInfoEntry > GetBlobKeyspace(int32_t sat) const
Get blob keyspace connection by sat id.
shared_ptr< CSatInfoSchema > GetSchema() const
Get configuration schema snapshot.
ESatInfoRefreshSchemaResult RefreshSchema(bool apply)
Refresh information for configuration database {sat_info3.sat2keyspace}.
CSatInfoSchemaProvider(string const &sat_info_keyspace, string const &domain, shared_ptr< CCassConnection > sat_info_connection, shared_ptr< IRegistry const > registry, string const &registry_section)
vector< SSatInfoEntry > GetNAKeyspaces() const
Get list of BioseqNA keyspaces connections.
optional< chrono::milliseconds > m_Timeout
shared_ptr< CCassConnection > x_GetSatInfoConnection() const
shared_ptr< CPSGMessages > GetMessages() const
Get messages snapshot.
void SetSatInfoConnection(shared_ptr< CCassConnection > sat_info_connection)
Changes Cassandra connection used to communicate with sat_info3.
string GetMessage(string const &name) const
Get configured message by name.
SSatInfoEntry GetResolverKeyspace() const
Get connection to resolver keyspace.
shared_ptr< IRegistry const > m_Registry
shared_ptr< string > m_RefreshErrorMessage
shared_ptr< CSatInfoSchema > m_SatInfoSchema
string GetRefreshErrorMessage() const
Get detailed message for last refresh operation (common for RefreshSchema and RefreshMessages).
shared_ptr< CCassConnection > m_SatInfoConnection
void x_SetRefreshErrorMessage(string const &message)
optional< ESatInfoRefreshSchemaResult > x_PopulateNewSchema(shared_ptr< CSatInfoSchema > &new_schema, shared_ptr< CSatInfoSchema > const &old_schema, vector< SSatInfoEntry > &&sat_info, map< int32_t, set< string >> &&secure_users)
int32_t GetMaxBlobKeyspaceSat() const
Get max id value for existing blob sat.
optional< SSatInfoEntry > GetIPGKeyspace() const
Get connection to IPG keyspace.
map< string, shared_ptr< CCassConnection > > m_Service2Cluster
shared_ptr< CCassConnection > x_GetConnectionByService(string const &service, string const &registry_section) const
optional< ESatInfoRefreshSchemaResult > x_AddConnection(shared_ptr< CCassConnection > const &connection, string const &registry_section, string const &service, bool is_default)
map< string, shared_ptr< CCassConnection > > m_Point2Cluster
optional< SSatInfoEntry > GetIPGKeyspace() const
Get connection to IPG keyspace.
optional< ESatInfoRefreshSchemaResult > x_AddSatInfoEntry(SSatInfoEntry entry, shared_ptr< CSatInfoSchema > const &old_schema, shared_ptr< IRegistry const > const &registry, string const &registry_section, set< string > const &secure_users)
optional< ESatInfoRefreshSchemaResult > x_ResolveServiceName(string const &service, string const &registry_section, vector< string > &connection_points)
int32_t GetMaxBlobKeyspaceSat() const
Get max id value for existing blob sat.
vector< SSatInfoEntry > GetNAKeyspaces() const
Get list of BioseqNA keyspaces connections.
shared_ptr< CCassConnection > m_DefaultConnection
map< int32_t, set< string > > m_SecureSatUsers
shared_ptr< CCassConnection > x_GetConnectionByConnectionPoint(string const &connection_point) const
SSatInfoEntry m_ResolverKeyspace
vector< SSatInfoEntry > m_BioseqNaKeyspaces
string ToString() const
Print internal state of CSatInfoSchema.
optional< ESatInfoRefreshSchemaResult > x_ResolveConnectionByServiceName(string service, shared_ptr< CSatInfoSchema > const &old_schema, shared_ptr< IRegistry const > const &registry, string const &registry_section, shared_ptr< CCassConnection > &connection)
SSatInfoEntry GetResolverKeyspace() const
Get connection to resolver keyspace.
map< int32_t, SSatInfoEntry > m_BlobKeyspaces
optional< SSatInfoEntry > m_IPGKeyspace
optional< chrono::milliseconds > m_ResolveTimeout
string m_DefaultRegistrySection
static bool s_Resolve(const string &service, vector< pair< string, int >> &result, TSERV_Type serv_type=fSERV_Any)
const_iterator find(const key_type &key) const
Definition: map.hpp:153
Definition: map.hpp:338
bool empty() const
Definition: set.hpp:133
const_iterator find(const key_type &key) const
Definition: set.hpp:137
static CMemoryRegistry registry
Definition: cn3d_tools.cpp:81
#define T(s)
Definition: common.h:230
@ eFatal
static const char * schema
Definition: stats.c:20
Int4 int32_t
Int2 int16_t
Int8 int64_t
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
TErrCode GetErrCode(void) const
Get error code.
Definition: ncbiexpt.cpp:453
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Info(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1185
int intptr_t
Definition: ncbitype.h:185
virtual const string & Get(const string &section, const string &name, TFlags flags=0) const
Get the parameter value.
Definition: ncbireg.cpp:262
#define END_SCOPE(ns)
End the previously defined scope.
Definition: ncbistl.hpp:75
#define BEGIN_SCOPE(ns)
Define a new scope.
Definition: ncbistl.hpp:72
static string HostPortToString(unsigned int host, unsigned short port)
See SOCK_HostPortToString()
static unsigned int gethostbyname(const string &host, ESwitch log=eOff)
Return 0 on error.
static bool isip(const string &host, bool fullquad=false)
static list< string > & Split(const CTempString str, const CTempString delim, list< string > &arr, TSplitFlags flags=0, vector< SIZE_TYPE > *token_pos=NULL)
Split a string using specified delimiters.
Definition: ncbistr.cpp:3461
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
Definition: ncbistr.cpp:3554
@ fConvErr_NoThrow
Do not throw an exception on error.
Definition: ncbistr.hpp:285
@ fSplit_Truncate
Definition: ncbistr.hpp:2501
@ fSplit_MergeDelimiters
Merge adjacent delimiters.
Definition: ncbistr.hpp:2498
int i
constexpr auto sort(_Init &&init)
unsigned int a
Definition: ncbi_localip.c:102
#define nullptr
Definition: ncbimisc.hpp:45
set< string > ReadSecureSatUsers(string const &keyspace, int32_t sat, shared_ptr< CCassConnection > connection, optional< chrono::milliseconds > timeout)
bool CanRetry(CCassandraException const &e, int retries)
constexpr TCassConsistency kSatInfoReadConsistency
string GetAddressString(string const &host, bool is_host)
size_t HashSatInfoData(vector< SSatInfoEntry > const &rows, string const &secure_registry_section, map< int32_t, set< string >> secure_users)
shared_ptr< CPSGMessages > ReadCassandraMessages(string const &keyspace, string const &domain, shared_ptr< CCassConnection > connection, optional< chrono::milliseconds > timeout)
constexpr int kSatInfoReadRetry
void hash_combine(size_t &seed, const T &v)
vector< SSatInfoEntry > ReadCassandraSatInfo(string const &keyspace, string const &domain, shared_ptr< CCassConnection > connection, optional< chrono::milliseconds > timeout)
shared_ptr< CCassConnection > MakeCassConnection(shared_ptr< IRegistry const > const &registry, string const &section, string const &service, bool reset_namespace)
string GetConnectionPointKey(string const &peer, int16_t port, string const &registry_section)
string GetServiceKey(string const &service, string const &registry_section)
ESatInfoRefreshSchemaResult
ESatInfoRefreshMessagesResult
ECassSchemaType
@ eIPGSchema
@ eBlobVer2Schema
@ eMaxSchema
@ eUnknownSchema
@ eResolverSchema
@ eBlobVer1Schema
@ eNamedAnnotationsSchema
static const char * prefix[]
Definition: pcregrep.c:405
#define row(bind, expected)
Definition: string_bind.c:73
static const char *const kChunkTableDefault
static const char *const kChunkTableBig
set< string > m_SecureUsers
ECassSchemaType schema_type
shared_ptr< CCassConnection > GetConnection() const
Get public satellite connection.
bool IsSecureSat() const
Is satellite requires secure access.
string ToString(string const &prefix) const
Get string representation for debug.
shared_ptr< CCassConnection > GetSecureConnection(string const &username) const
Get secure satellite connection.
shared_ptr< CCassConnection > connection
shared_ptr< CCassConnection > m_SecureConnection
static string query
static int seed
Definition: test_table.cpp:132
else result
Definition: token2.c:20
Modified on Wed May 22 11:34:02 2024 by modify_doxy.py rev. 669887