NCBI C++ ToolKit
blob_storage.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: blob_storage.cpp 100878 2023-09-22 15:56:05Z saprykin $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Sergey Satskiy
27  *
28  * File Description:
29  *
30  * The functionality not directly related to blob operations
31  *
32  */
33 
34 #include <ncbi_pch.hpp>
35 
37 #include <connect/ncbi_socket.hpp>
38 
42 
44 
45 const char* const SBlobStorageConstants::kChunkTableDefault = "blob_chunk";
46 const char* const SBlobStorageConstants::kChunkTableBig = "big_blob_chunk";
47 
49 
51 constexpr int kSatInfoReadRetry{5};
52 
53 bool CanRetry(CCassandraException const& e, int retries)
54 {
55  return
56  (
59  )
60  && retries > 0;
61 }
62 
63 vector<SSatInfoEntry>
64 ReadCassandraSatInfo(string const& keyspace, string const& domain, shared_ptr<CCassConnection> connection)
65 {
66  vector<SSatInfoEntry> result;
67  for (int i = kSatInfoReadRetry; i >= 0; --i) {
68  try {
69  auto columns = connection->GetColumnNames(keyspace, "sat2keyspace");
70  bool flags_column_exists = find(cbegin(columns), cend(columns), "flags") != cend(columns);
71  string flags_cql = flags_column_exists ? ", flags" : "";
72  auto query = connection->NewQuery();
73  query->SetSQL(
74  "SELECT sat, keyspace_name, schema_type, service" + flags_cql + " FROM "
75  + keyspace + ".sat2keyspace WHERE domain = ?", 1);
76  query->BindStr(0, domain);
77  query->Query(kSatInfoReadConsistency, false, false);
78  while (query->NextRow() == ar_dataready) {
79  SSatInfoEntry row;
80  row.sat = query->FieldGetInt32Value(0);
81  row.keyspace = query->FieldGetStrValue(1);
82  row.schema_type = static_cast<ECassSchemaType>(query->FieldGetInt32Value(2));
83  row.service = query->FieldGetStrValueDef(3, "");
84  if (flags_column_exists) {
85  row.flags = query->FieldGetInt64Value(4, 0);
86  }
87  if (row.schema_type <= eUnknownSchema || row.schema_type > eMaxSchema) {
88  // ignoring
89  }
90  else {
91  result.push_back(row);
92  }
93  }
94  }
95  catch (CCassandraException const& e) {
96  if (!CanRetry(e, i)) {
97  throw;
98  }
99  }
100  break;
101  }
102 
103  sort(begin(result), end(result),
104  [](SSatInfoEntry const& a, SSatInfoEntry const& b)
105  {
106  return a.sat < b.sat;
107  }
108  );
109 
110  return result;
111 }
112 
113 shared_ptr<CPSGMessages>
114 ReadCassandraMessages(string const& keyspace, string const& domain, shared_ptr<CCassConnection> connection)
115 {
116  auto result = make_shared<CPSGMessages>();
117  for (int i = kSatInfoReadRetry; i >= 0; --i) {
118  try {
119  result->Clear();
120  auto query = connection->NewQuery();
121  query->SetSQL("SELECT name, value FROM " + keyspace + ".messages WHERE domain = ?", 1);
122  query->BindStr(0, domain);
123  query->Query(kSatInfoReadConsistency, false, false);
124  while (query->NextRow() == ar_dataready) {
125  result->Set(
126  query->FieldGetStrValue(0),
127  query->FieldGetStrValueDef(1, "")
128  );
129  }
130  break;
131  }
132  catch (CCassandraException const& e) {
133  if (!CanRetry(e, i)) {
134  throw;
135  }
136  }
137  }
138  return result;
139 }
140 
141 set<string> ReadSecureSatUsers(string const& keyspace, int32_t sat, shared_ptr<CCassConnection> connection)
142 {
144  for (int i = kSatInfoReadRetry; i >= 0; --i) {
145  try {
146  result.clear();
147  auto query = connection->NewQuery();
148  query->SetSQL("SELECT username FROM " + keyspace + ".web_user WHERE sat = ?", 1);
149  query->BindInt32(0, sat);
150  query->Query(kSatInfoReadConsistency, false, false);
151  while (query->NextRow() == ar_dataready) {
152  auto username = query->FieldGetStrValueDef(0, "");
153  if (!username.empty()) {
154  result.insert(username);
155  }
156  }
157  break;
158  }
159  catch (CCassandraException const& e) {
160  if (!CanRetry(e, i)) {
161  throw;
162  }
163  }
164  }
165  return result;
166 }
167 
168 string GetAddressString(string const& host, bool is_host)
169 {
170  if (is_host && !CSocketAPI::isip(host, false)) {
171  auto addr = CSocketAPI::gethostbyname(host);
172  if (addr == 0) {
173  return "";
174  }
175  return CSocketAPI::HostPortToString(addr, 0);
176  }
177  return host;
178 }
179 
180 template <class T>
181 inline void hash_combine(size_t& seed, const T& v)
182 {
183  std::hash<T> hasher;
184  seed ^= hasher(v) + 0x9e3779b9 + (seed<<6) + (seed>>2);
185 }
186 
188  vector<SSatInfoEntry> const& rows,
189  string const& secure_registry_section,
190  map<int32_t, set<string>> secure_users
191 )
192 {
193  size_t result{0};
194  hash_combine(result, secure_registry_section);
195  for (auto const& row : rows) {
196  hash_combine(result, row.sat);
197  hash_combine(result, row.keyspace);
198  hash_combine(result, row.schema_type);
199  hash_combine(result, row.service);
200  hash_combine(result, row.flags);
201  if (row.IsSecureSat()) {
202  for (auto user: secure_users[row.sat]) {
203  hash_combine(result, user);
204  }
205  }
206  }
207  return result;
208 }
209 
210 
211 shared_ptr<CCassConnection>
213  shared_ptr<IRegistry const> const& registry,
214  string const& section,
215  string const& service,
216  bool reset_namespace
217 )
218 {
219  auto factory = CCassConnectionFactory::s_Create();
220  factory->LoadConfig(registry.get(), section);
221  if (!service.empty()) {
222  factory->SetServiceName(service);
223  }
224  if (reset_namespace) {
225  factory->SetDataNamespace("");
226  }
227  auto connection = factory->CreateInstance();
228  connection->Connect();
229  return connection;
230 }
231 
232 inline string GetServiceKey(string const& service, string const& registry_section)
233 {
234  return registry_section + "|" + service;
235 }
236 
237 inline string GetConnectionPointKey(string const& peer, int16_t port, string const& registry_section)
238 {
239  return registry_section + "|" + peer + ":" + to_string(port);
240 }
241 
242 END_SCOPE()
243 
244 optional<SSatInfoEntry> CSatInfoSchema::GetBlobKeyspace(int32_t sat) const
245 {
246  auto itr = m_BlobKeyspaces.find(sat);
247  if (
248  itr != cend(m_BlobKeyspaces)
249  && (
250  itr->second.schema_type == eBlobVer2Schema
251  || itr->second.schema_type == eNamedAnnotationsSchema
252  )
253  ) {
254  return itr->second;
255  }
256  return {};
257 }
258 
260 {
261  auto itr = crbegin(m_BlobKeyspaces);
262  return itr == crend(m_BlobKeyspaces) ? -1 : itr->first;
263 }
264 
265 vector<SSatInfoEntry> CSatInfoSchema::GetNAKeyspaces() const
266 {
267  return m_BioseqNaKeyspaces;
268 }
269 
271 {
272  return m_ResolverKeyspace;
273 }
274 
275 optional<SSatInfoEntry> CSatInfoSchema::GetIPGKeyspace() const
276 {
277  return m_IPGKeyspace;
278 }
279 
280 shared_ptr<CCassConnection> CSatInfoSchema::x_GetConnectionByService(string const& service, string const& registry_section) const
281 {
282  auto itr = m_Service2Cluster.find(GetServiceKey(service, registry_section));
283  return itr == cend(m_Service2Cluster) ? nullptr : itr->second;
284 }
285 
286 shared_ptr<CCassConnection> CSatInfoSchema::x_GetConnectionByConnectionPoint(string const& connection_point) const
287 {
288  auto itr = m_Point2Cluster.find(connection_point);
289  return itr == cend(m_Point2Cluster) ? nullptr : itr->second;
290 }
291 
292 optional<ESatInfoRefreshSchemaResult> CSatInfoSchema::x_AddConnection(
293  shared_ptr<CCassConnection> const& connection,
294  string const& registry_section,
295  string const& service,
296  bool is_default
297 )
298 {
299  for (auto peer : connection->GetLocalPeersAddressList("")) {
300  m_Point2Cluster[GetConnectionPointKey(peer, connection->GetPort(), registry_section)] = connection;
301  }
302  if (is_default) {
303  m_DefaultConnection = connection;
304  m_DefaultRegistrySection = registry_section;
305  }
306  else {
307  m_Service2Cluster[GetServiceKey(service, registry_section)] = connection;
308  }
309  return {};
310 }
311 
312 optional<ESatInfoRefreshSchemaResult> CSatInfoSchema::x_ResolveServiceName(
313  string const& service, string const& registry_section, vector<string>& connection_points)
314 {
315  connection_points.clear();
316  {
317  class CInPlaceConnIniter : protected CConnIniter
318  {} conn_initer; /*NCBI_FAKE_WARNING*/
319  }
320 
321  bool is_hostlist = (service.find(':') != string::npos)
322  || (service.find(' ') != string::npos)
323  || (service.find(',') != string::npos);
324 
325  string hosts;
326  if (!is_hostlist) {
327  ERR_POST(Info << "CSatInfoSchema::x_AddClusterByServiceName uses service name: '" << service << "'");
328  hosts = LbsmLookup::s_Resolve(service, ',');
329  if (hosts.empty()) {
330  ERR_POST(Info << "CSatInfoSchema::x_AddClusterByServiceName failed to resolve LBSM service name: '" << service << "'");
332  }
333  ERR_POST(Info << "CSatInfoSchema::x_AddClusterByServiceName resolved service name: '" << service << "' => '" << hosts << "'");
334  }
335  else {
336  ERR_POST(Info << "CSatInfoSchema::x_AddClusterByServiceName uses host list: '" << service << "'");
337  hosts = service;
338  }
339 
340  vector<string> items;
342  for (auto item : items) {
343  string item_host;
344  string item_port_token;
345  if (NStr::SplitInTwo(item, ":", item_host, item_port_token)) {
346  int16_t item_port = NStr::StringToNumeric<short>(item_port_token, NStr::fConvErr_NoThrow);
347  item_port = item_port ? item_port : CCassConnection::kCassDefaultPort;
348  item_host = GetAddressString(item_host, is_hostlist);
349  if (item_host.empty()) {
351  }
352  connection_points.push_back(GetConnectionPointKey(item_host, item_port, registry_section));
353  }
354  else {
355  item = GetAddressString(item, is_hostlist);
356  if (item.empty()) {
358  }
359  connection_points.push_back(
360  GetConnectionPointKey(item_host, CCassConnection::kCassDefaultPort, registry_section)
361  );
362  }
363  }
364  return {};
365 }
366 
367 optional<ESatInfoRefreshSchemaResult> CSatInfoSchema::x_ResolveConnectionByServiceName(
368  string service,
369  shared_ptr<CSatInfoSchema> const& old_schema,
370  shared_ptr<IRegistry const> const& registry,
371  string const& registry_section,
372  shared_ptr<CCassConnection>& connection
373 )
374 {
375  // Check this schema data
376  if (service.empty()) {
377  if (registry_section == m_DefaultRegistrySection) {
378  connection = m_DefaultConnection;
379  return {};
380  }
381  else {
382  service = registry->Get(registry_section, "service");
383  }
384  }
385  if (service.empty() && registry_section == m_DefaultRegistrySection) {
386  connection = m_DefaultConnection;
387 
388  }
389  connection = x_GetConnectionByService(service, registry_section);
390  if (connection) {
391  return {};
392  }
393  vector<string> connection_points;
394  auto result = x_ResolveServiceName(service, registry_section, connection_points);
395  if (result.has_value()) {
396  return result;
397  }
398  for (auto const& connection_point : connection_points) {
399  connection = x_GetConnectionByConnectionPoint(connection_point);
400  if (connection) {
401  m_Service2Cluster.emplace(GetServiceKey(service, registry_section), connection);
402  return {};
403  }
404  }
405 
406  // Check previous schema version
407  if (old_schema) {
408  connection = old_schema->x_GetConnectionByService(service, registry_section);
409  if (connection) {
410  x_AddConnection(connection, registry_section, service, false);
411  return {};
412  }
413  for (auto const& connection_point : connection_points) {
414  connection = old_schema->x_GetConnectionByConnectionPoint(connection_point);
415  if (connection) {
416  x_AddConnection(connection, registry_section, service, false);
417  return {};
418  }
419  }
420  }
421 
422  // Make NEW connection
423  {
424  connection = MakeCassConnection(registry, registry_section, service, true);
425  x_AddConnection(connection, registry_section, service, false);
426  return {};
427  }
428 
429  return {};
430 }
431 
432 optional<ESatInfoRefreshSchemaResult> CSatInfoSchema::x_AddSatInfoEntry(
433  SSatInfoEntry entry,
434  shared_ptr<CSatInfoSchema> const& old_schema,
435  shared_ptr<IRegistry const> const& registry,
436  string const& registry_section,
437  set<string> const& secure_users
438 )
439 {
440  shared_ptr<CCassConnection> connection;
441  auto result = x_ResolveConnectionByServiceName(entry.service, old_schema, registry, registry_section, connection);
442  if (result.has_value()) {
443  return result;
444  }
445  if (!secure_users.empty()) {
446  entry.m_SecureUsers = secure_users;
447  m_SecureSatUsers[entry.sat] = secure_users;
448  }
449  if (entry.IsSecureSat()) {
450  entry.connection = nullptr;
451  entry.m_SecureConnection = move(connection);
452  }
453  else {
454  entry.connection = move(connection);
455  entry.m_SecureConnection = nullptr;
456  }
457 
458  // Temporary restriction. Until PSG needs/supports those types of secure satellites
459  if (entry.IsSecureSat() && entry.schema_type != eBlobVer2Schema)
460  {
462  }
463  switch(entry.schema_type) {
464  case eResolverSchema: {
465  if (!m_ResolverKeyspace.keyspace.empty()) {
467  }
468  m_ResolverKeyspace = move(entry);
469  break;
470  }
472  m_BlobKeyspaces.emplace(entry.sat, entry);
473  m_BioseqNaKeyspaces.push_back(move(entry));
474  break;
475  }
476  case eBlobVer1Schema:
477  case eBlobVer2Schema:
478  {
479  m_BlobKeyspaces.emplace(entry.sat, move(entry));
480  break;
481  }
482  case eIPGSchema:
483  {
484  m_IPGKeyspace = make_optional(move(entry));
485  break;
486  }
487  case eUnknownSchema: // LCOV_EXCL_LINE
488  break; // LCOV_EXCL_LINE
489  }
490  return {};
491 }
492 
493 string SSatInfoEntry::ToString(string const& prefix) const
494 {
495  stringstream s;
496  for (auto user : m_SecureUsers) {
497  s << "\n" << prefix << " - '" << user << "'";
498  }
499 
500  string secure_users = s.str();
501  secure_users = secure_users.empty() ? "{}" : secure_users;
502  return prefix + "sat: " + to_string(sat)
503  + ", schema: "+ to_string(static_cast<int>(schema_type))
504  + ", keyspace: '" + keyspace + "'"
505  + ", service: '" + service + "'"
506  + ", flags: " + to_string(flags)
507  + ", connection: &" + to_string(reinterpret_cast<intptr_t>(connection.get()))
508  + ", secure_connection: &" + to_string(reinterpret_cast<intptr_t>(m_SecureConnection.get()))
509  + ", secure_users: " + secure_users;
510 }
511 
512 shared_ptr<CCassConnection> SSatInfoEntry::GetSecureConnection(string const& username) const
513 {
514  if (!IsSecureSat()) {
515  NCBI_THROW(CCassandraException, eGeneric, "SSatInfoEntry::GetSecureConnection() should not be called for NON secure satellites");
516  }
517  if (username.empty() || m_SecureUsers.empty()) {
518  return nullptr;
519  }
520  auto user_entry = m_SecureUsers.find(username);
521  bool user_allowed_to_read = user_entry != cend(m_SecureUsers);
522  if (user_allowed_to_read) {
523  return m_SecureConnection;
524  }
525  else {
526  return nullptr;
527  }
528 }
529 
530 shared_ptr<CCassConnection> SSatInfoEntry::GetConnection() const
531 {
532  if (IsSecureSat()) {
533  NCBI_THROW(CCassandraException, eGeneric, "SSatInfoEntry::GetConnection() should not be called for secure satellites");
534  }
535  return this->connection;
536 }
537 
539 {
540  stringstream s;
541  s << "Blob keyspaces: \n";
542  for (auto sat : m_BlobKeyspaces) {
543  s << sat.second.ToString(" ") << "\n";
544  }
545  s << "BioseqNA keyspaces: \n";
546  for (auto sat : m_BioseqNaKeyspaces) {
547  s << sat.ToString(" ") << "\n";
548  }
549  s << "Resolver keyspace: \n";
550  s << m_ResolverKeyspace.ToString(" ") << "\n";
551  s << "IPG keyspace: \n";
552  s << ((m_IPGKeyspace) ? m_IPGKeyspace->ToString(" ") : " None") << "\n";
553  s << "Secure sat users: \n";
554  for (auto sat_users : m_SecureSatUsers) {
555  s << " " << sat_users.first << "\n";
556  for (auto user : sat_users.second) {
557  s << " - '" << user << "'\n";
558  }
559  }
560  s << "Default connection: &" << to_string(reinterpret_cast<intptr_t>(m_DefaultConnection.get())) << "\n";
561  s << "Default registry section: '" << m_DefaultRegistrySection << "'\n";
562  s << "m_Point2Cluster: \n";
563  for (auto item : m_Point2Cluster) {
564  s << " " << item.first << " : &" << to_string(reinterpret_cast<intptr_t>(item.second.get())) << "\n";
565  }
566  s << "m_Service2Cluster: \n";
567  for (auto item : m_Service2Cluster) {
568  s << " " << item.first << " : &" << to_string(reinterpret_cast<intptr_t>(item.second.get())) << "\n";
569  }
570  return s.str();
571 }
572 
574  string const& sat_info_keyspace,
575  string const& domain,
576  shared_ptr<CCassConnection> sat_info_connection,
577  shared_ptr<IRegistry const> registry,
578  string const& registry_section
579 )
580  : m_SatInfoKeyspace(sat_info_keyspace)
581  , m_Domain(domain)
582  , m_SatInfoConnection(move(sat_info_connection))
583  , m_Registry(move(registry))
584  , m_RegistrySection(registry_section)
585 {
586  if (m_SatInfoConnection == nullptr) {
587  NCBI_THROW(CCassandraException, eFatal, "CSatInfoSchemaProvider() Cassandra connection should not be nullptr");
588  }
589 }
590 
591 void CSatInfoSchemaProvider::SetSatInfoConnection(shared_ptr<CCassConnection> sat_info_connection)
592 {
593  atomic_store(&m_SatInfoConnection, move(sat_info_connection));
594 }
595 
596 shared_ptr<CCassConnection> CSatInfoSchemaProvider::x_GetSatInfoConnection() const
597 {
598  return atomic_load(&m_SatInfoConnection);
599 }
600 
601 optional<SSatInfoEntry> CSatInfoSchemaProvider::GetBlobKeyspace(int32_t sat) const
602 {
603  auto p = GetSchema();
604  return p ? p->GetBlobKeyspace(sat) : nullopt;
605 }
606 
607 vector<SSatInfoEntry> CSatInfoSchemaProvider::GetNAKeyspaces() const
608 {
609  auto p = GetSchema();
610  return p ? p->GetNAKeyspaces() : vector<SSatInfoEntry>();
611 }
612 
614 {
615  auto p = GetSchema();
616  return p ? p->GetResolverKeyspace() : SSatInfoEntry();
617 }
618 
619 optional<SSatInfoEntry> CSatInfoSchemaProvider::GetIPGKeyspace() const
620 {
621  auto p = GetSchema();
622  return p ? p->GetIPGKeyspace() : nullopt;
623 }
624 
626 {
627  auto p = GetSchema();
628  return p ? p->GetMaxBlobKeyspaceSat() : -1;
629 }
630 
631 string CSatInfoSchemaProvider::GetMessage(string const& name) const
632 {
633  auto p = GetMessages();
634  return p ? p->Get(name) : "";
635 }
636 
637 shared_ptr<CSatInfoSchema> CSatInfoSchemaProvider::GetSchema() const
638 {
639  return atomic_load(&m_SatInfoSchema);
640 }
641 
642 shared_ptr<CPSGMessages> CSatInfoSchemaProvider::GetMessages() const
643 {
644  return atomic_load(&m_SatInfoMessages);
645 }
646 
648 {
649  if (m_SatInfoKeyspace.empty()) {
650  x_SetRefreshErrorMessage("mapping_keyspace is not specified");
652  }
654  if (rows.empty()) {
655  x_SetRefreshErrorMessage(m_SatInfoKeyspace + ".sat2keyspace info is empty");
657  }
658  map<int32_t, set<string>> secure_users;
659  if (!m_SecureSatRegistrySection.empty()) {
660  auto secure_connection = MakeCassConnection(m_Registry, m_SecureSatRegistrySection, "", false);
661  for (auto sat_info: rows) {
662  if (sat_info.IsSecureSat()) {
663  secure_users[sat_info.sat] = ReadSecureSatUsers(secure_connection->Keyspace(), sat_info.sat, secure_connection);
664  }
665  }
666  }
667  auto rows_hash = HashSatInfoData(rows, m_SecureSatRegistrySection, secure_users);
668  if (rows_hash == m_SatInfoHash) {
670  }
671  else if (!apply) {
673  }
674  auto schema = make_shared<CSatInfoSchema>();
675  auto old_schema = GetSchema();
676  auto result = x_PopulateNewSchema(schema, old_schema, move(rows), move(secure_users));
677  if (result.has_value()) {
678  return result.value();
679  }
680  atomic_store(&m_SatInfoSchema, move(schema));
681  m_SatInfoHash = rows_hash;
683 }
684 
685 optional<ESatInfoRefreshSchemaResult> CSatInfoSchemaProvider::x_PopulateNewSchema(
686  shared_ptr<CSatInfoSchema>& new_schema,
687  shared_ptr<CSatInfoSchema> const& old_schema,
688  vector<SSatInfoEntry>&& sat_info,
689  map<int32_t, set<string>>&& secure_users
690 )
691 {
692  auto result = new_schema->x_AddConnection(x_GetSatInfoConnection(), m_RegistrySection, "", true);
693  if (result.has_value()) {
694  return result.value();
695  }
696  for (auto& entry : sat_info) {
697  string registry_section = m_RegistrySection;
698  if (entry.IsSecureSat() && !m_SecureSatRegistrySection.empty()) {
699  registry_section = m_SecureSatRegistrySection;
700  }
701  auto sat_secure_users = secure_users[entry.sat];
702  auto result = new_schema->x_AddSatInfoEntry(entry, old_schema, m_Registry, registry_section, sat_secure_users);
703  if (result.has_value()) {
704  switch(result.value()) {
706  x_SetRefreshErrorMessage("More than one resolver keyspace in the " +
707  m_SatInfoKeyspace + ".sat2keyspace table");
708  break;
710  x_SetRefreshErrorMessage("Cannot resolve service name: '" + entry.service + "'");
711  break;
713  x_SetRefreshErrorMessage("Satellite with id (" + to_string(entry.sat) + ") cannot be configured as secure");
714  break;
715  default:
716  x_SetRefreshErrorMessage("Unexpected result for SatInfoEntry processing: "
717  + to_string(static_cast<int64_t>(result.value())));
718  }
719  return result.value();
720  }
721  }
722  if (
724  (new_schema->m_ResolverKeyspace.keyspace.empty() || !new_schema->m_ResolverKeyspace.connection)
725  ) {
726  x_SetRefreshErrorMessage("resolver schema is not found in sat2keyspace");
728  }
729  if (new_schema->GetMaxBlobKeyspaceSat() == -1) {
730  x_SetRefreshErrorMessage("sat2keyspace is incomplete");
732  }
733  return {};
734 }
735 
737 {
738  if (m_SatInfoKeyspace.empty()) {
739  x_SetRefreshErrorMessage("mapping_keyspace is not specified");
741  }
743  if (messages->IsEmpty()) {
744  x_SetRefreshErrorMessage(m_SatInfoKeyspace + ".messages info is empty");
746  }
747 
748  auto old_messages = GetMessages();
749  if (old_messages && *old_messages == *messages) {
751  }
752  else if (!apply) {
754  }
755  atomic_store(&m_SatInfoMessages, move(messages));
757 }
758 
760 {
761  auto p = atomic_load(&m_RefreshErrorMessage);
762  return p ? *p : "";
763 }
764 
766 {
767  auto msg = make_shared<string>(message);
768  atomic_store(&m_RefreshErrorMessage, move(msg));
769 }
770 
771 
#define END_IDBLOB_SCOPE
Definition: IdCassScope.hpp:40
#define BEGIN_IDBLOB_SCOPE
Definition: IdCassScope.hpp:39
CassConsistency TCassConsistency
Definition: cass_driver.hpp:98
@ ar_dataready
Definition: cass_driver.hpp:70
static shared_ptr< CCassConnectionFactory > s_Create()
static constexpr int16_t kCassDefaultPort
static constexpr TCassConsistency kLocalQuorum
Helper hook-up class that installs default logging/registry/locking (but only if they have not yet be...
shared_ptr< CPSGMessages > m_SatInfoMessages
ESatInfoRefreshMessagesResult RefreshMessages(bool apply)
Refresh information for messages database {sat_info3.messages}.
optional< SSatInfoEntry > GetBlobKeyspace(int32_t sat) const
Get blob keyspace connection by sat id.
shared_ptr< CSatInfoSchema > GetSchema() const
Get configuration schema snapshot.
ESatInfoRefreshSchemaResult RefreshSchema(bool apply)
Refresh information for configuration database {sat_info3.sat2keyspace}.
CSatInfoSchemaProvider(string const &sat_info_keyspace, string const &domain, shared_ptr< CCassConnection > sat_info_connection, shared_ptr< IRegistry const > registry, string const &registry_section)
vector< SSatInfoEntry > GetNAKeyspaces() const
Get list of BioseqNA keyspaces connections.
shared_ptr< CCassConnection > x_GetSatInfoConnection() const
shared_ptr< CPSGMessages > GetMessages() const
Get messages snapshot.
void SetSatInfoConnection(shared_ptr< CCassConnection > sat_info_connection)
Changes Cassandra connection used to communicate with sat_info3.
string GetMessage(string const &name) const
Get configured message by name.
SSatInfoEntry GetResolverKeyspace() const
Get connection to resolver keyspace.
shared_ptr< IRegistry const > m_Registry
shared_ptr< string > m_RefreshErrorMessage
shared_ptr< CSatInfoSchema > m_SatInfoSchema
string GetRefreshErrorMessage() const
Get detailed message for last refresh operation (common for RefreshSchema and RefreshMessages).
shared_ptr< CCassConnection > m_SatInfoConnection
void x_SetRefreshErrorMessage(string const &message)
optional< ESatInfoRefreshSchemaResult > x_PopulateNewSchema(shared_ptr< CSatInfoSchema > &new_schema, shared_ptr< CSatInfoSchema > const &old_schema, vector< SSatInfoEntry > &&sat_info, map< int32_t, set< string >> &&secure_users)
int32_t GetMaxBlobKeyspaceSat() const
Get max id value for existing blob sat.
optional< SSatInfoEntry > GetIPGKeyspace() const
Get connection to IPG keyspace.
map< string, shared_ptr< CCassConnection > > m_Service2Cluster
shared_ptr< CCassConnection > x_GetConnectionByService(string const &service, string const &registry_section) const
optional< ESatInfoRefreshSchemaResult > x_AddConnection(shared_ptr< CCassConnection > const &connection, string const &registry_section, string const &service, bool is_default)
map< string, shared_ptr< CCassConnection > > m_Point2Cluster
optional< SSatInfoEntry > GetIPGKeyspace() const
Get connection to IPG keyspace.
optional< ESatInfoRefreshSchemaResult > x_AddSatInfoEntry(SSatInfoEntry entry, shared_ptr< CSatInfoSchema > const &old_schema, shared_ptr< IRegistry const > const &registry, string const &registry_section, set< string > const &secure_users)
optional< ESatInfoRefreshSchemaResult > x_ResolveServiceName(string const &service, string const &registry_section, vector< string > &connection_points)
int32_t GetMaxBlobKeyspaceSat() const
Get max id value for existing blob sat.
vector< SSatInfoEntry > GetNAKeyspaces() const
Get list of BioseqNA keyspaces connections.
shared_ptr< CCassConnection > m_DefaultConnection
map< int32_t, set< string > > m_SecureSatUsers
shared_ptr< CCassConnection > x_GetConnectionByConnectionPoint(string const &connection_point) const
SSatInfoEntry m_ResolverKeyspace
vector< SSatInfoEntry > m_BioseqNaKeyspaces
string ToString() const
Print internal state of CSatInfoSchema.
optional< ESatInfoRefreshSchemaResult > x_ResolveConnectionByServiceName(string service, shared_ptr< CSatInfoSchema > const &old_schema, shared_ptr< IRegistry const > const &registry, string const &registry_section, shared_ptr< CCassConnection > &connection)
SSatInfoEntry GetResolverKeyspace() const
Get connection to resolver keyspace.
map< int32_t, SSatInfoEntry > m_BlobKeyspaces
optional< SSatInfoEntry > m_IPGKeyspace
string m_DefaultRegistrySection
static bool s_Resolve(const string &service, vector< pair< string, int >> &result, TSERV_Type serv_type=fSERV_Any)
const_iterator find(const key_type &key) const
Definition: map.hpp:153
Definition: map.hpp:338
bool empty() const
Definition: set.hpp:133
const_iterator find(const key_type &key) const
Definition: set.hpp:137
static CMemoryRegistry registry
Definition: cn3d_tools.cpp:81
#define T(s)
Definition: common.h:230
@ eFatal
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
TErrCode GetErrCode(void) const
Get error code.
Definition: ncbiexpt.cpp:453
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Info(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1185
int intptr_t
Definition: ncbitype.h:185
virtual const string & Get(const string &section, const string &name, TFlags flags=0) const
Get the parameter value.
Definition: ncbireg.cpp:262
#define END_SCOPE(ns)
End the previously defined scope.
Definition: ncbistl.hpp:75
#define BEGIN_SCOPE(ns)
Define a new scope.
Definition: ncbistl.hpp:72
static string HostPortToString(unsigned int host, unsigned short port)
See SOCK_HostPortToString()
static unsigned int gethostbyname(const string &host, ESwitch log=eOff)
Return 0 on error.
static bool isip(const string &host, bool fullquad=false)
static list< string > & Split(const CTempString str, const CTempString delim, list< string > &arr, TSplitFlags flags=0, vector< SIZE_TYPE > *token_pos=NULL)
Split a string using specified delimiters.
Definition: ncbistr.cpp:3457
static bool SplitInTwo(const CTempString str, const CTempString delim, string &str1, string &str2, TSplitFlags flags=0)
Split a string into two pieces using the specified delimiters.
Definition: ncbistr.cpp:3550
@ fConvErr_NoThrow
Do not throw an exception on error.
Definition: ncbistr.hpp:285
@ fSplit_Truncate
Definition: ncbistr.hpp:2501
@ fSplit_MergeDelimiters
Merge adjacent delimiters.
Definition: ncbistr.hpp:2498
int i
constexpr auto sort(_Init &&init)
unsigned int a
Definition: ncbi_localip.c:102
#define nullptr
Definition: ncbimisc.hpp:45
vector< SSatInfoEntry > ReadCassandraSatInfo(string const &keyspace, string const &domain, shared_ptr< CCassConnection > connection)
shared_ptr< CPSGMessages > ReadCassandraMessages(string const &keyspace, string const &domain, shared_ptr< CCassConnection > connection)
bool CanRetry(CCassandraException const &e, int retries)
constexpr TCassConsistency kSatInfoReadConsistency
string GetAddressString(string const &host, bool is_host)
size_t HashSatInfoData(vector< SSatInfoEntry > const &rows, string const &secure_registry_section, map< int32_t, set< string >> secure_users)
constexpr int kSatInfoReadRetry
set< string > ReadSecureSatUsers(string const &keyspace, int32_t sat, shared_ptr< CCassConnection > connection)
void hash_combine(size_t &seed, const T &v)
shared_ptr< CCassConnection > MakeCassConnection(shared_ptr< IRegistry const > const &registry, string const &section, string const &service, bool reset_namespace)
string GetConnectionPointKey(string const &peer, int16_t port, string const &registry_section)
string GetServiceKey(string const &service, string const &registry_section)
ESatInfoRefreshSchemaResult
ESatInfoRefreshMessagesResult
ECassSchemaType
@ eIPGSchema
@ eBlobVer2Schema
@ eMaxSchema
@ eUnknownSchema
@ eResolverSchema
@ eBlobVer1Schema
@ eNamedAnnotationsSchema
static const char * prefix[]
Definition: pcregrep.c:405
static const char * schema
Definition: stats.c:20
signed short int16_t
Definition: stdint.h:122
signed __int64 int64_t
Definition: stdint.h:135
signed int int32_t
Definition: stdint.h:123
static const char *const kChunkTableDefault
static const char *const kChunkTableBig
set< string > m_SecureUsers
ECassSchemaType schema_type
shared_ptr< CCassConnection > GetConnection() const
Get public satellite connection.
bool IsSecureSat() const
Is satellite requires secure access.
string ToString(string const &prefix) const
Get string representation for debug.
shared_ptr< CCassConnection > GetSecureConnection(string const &username) const
Get secure satellite connection.
shared_ptr< CCassConnection > connection
shared_ptr< CCassConnection > m_SecureConnection
static string query
static int seed
Definition: test_table.cpp:132
else result
Definition: token2.c:20
Modified on Thu Nov 30 04:56:30 2023 by modify_doxy.py rev. 669887