NCBI C++ ToolKit
data_crawler_cache.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: data_crawler_cache.cpp 46707 2021-09-09 21:01:01Z rudnev $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Vladislav Evgeniev
27  *
28  * File Description:
29  *
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
35 #include <corelib/ncbiapp.hpp>
36 #include <corelib/ncbireg.hpp>
37 #include <corelib/rwstream.hpp>
39 #include <util/md5.hpp>
40 #include <chrono>
41 #include <type_traits>
42 #include <system_error>
43 
44 
45 
47 
49 {
50  bool blob_present = m_BlobCache.HasBlob(m_BlobKey, "");
51  bool sync_present = m_SyncCache.HasBlob(m_SyncKey, "");
52  LOG_POST(Info << "Blob: " << m_BlobKey << " : " << blob_present << " Sync: " << sync_present);
53  if (blob_present) {
54  if (!sync_present)
55  return eState_Created;
56  }
57  if (sync_present)
58  return x_ReadState();
59 
60  return eState_None;
61 }
62 
64 {
65  ECacheState actual_state = GetState();
66  switch(desired_state) {
67  case eState_None:
68  {
70  {
71  std::lock_guard<std::mutex> guard(m_SyncCacheMutex);
73  }
74  actual_state = eState_None;
75  break;
76  }
77  case eState_InProgress:
78  {
79  if (actual_state != eState_None)
80  return actual_state;
83  actual_state = eState_InProgress;
84  break;
85  }
87  {
90  actual_state = eState_NotAccessible;
91  break;
92  }
93  case eState_Created:
94  {
96  {
97  std::lock_guard<std::mutex> guard(m_SyncCacheMutex);
99  }
100  actual_state = eState_Created;
101  break;
102  }
103  }
104  return actual_state;
105 }
106 
108 {
109  return (m_BlobCache.GetSize(m_BlobKey, 0, ""));
110 }
111 
112 unique_ptr<CNcbiIstream> CDataCrawlerCache::GetReadStream()
113 {
114  unique_ptr<IReader> reader(m_BlobCache.GetReadStream(m_BlobKey, 0, ""));
115  return unique_ptr<CNcbiIstream>(new CRStream(reader.release(), 0, 0, CRWStreambuf::fOwnReader) );
116 }
117 
118 unique_ptr<CNcbiOstream> CDataCrawlerCache::GetWriteStream()
119 {
120  unique_ptr<IWriter> writer(m_BlobCache.GetWriteStream(m_BlobKey, 0, ""));
121  return unique_ptr<CNcbiOstream>(new CWStream(writer.release(), 0, 0, CRWStreambuf::fOwnWriter) );
122 }
123 
125 {
126  m_BlobCache.Remove(m_BlobKey, 0, "");
127 }
128 
130 {
136 }
137 
139 {
140  string key = x_GetCacheKey();
141  const size_t max_key { 120 };
142  if (!m_isImmutableKey && key.length() > max_key) { // Shorten key due to NC limits
143  const size_t flangs_size { 50 };
144  string left = key.substr(0, flangs_size);
145  string right = key.substr(key.length() - flangs_size - 1);
146  CMD5 md5;
147  md5.Update(key.c_str(), key.length());
148  key = left;
149  key += '_';
150  key += md5.GetHexSum();
151  key += '_';
152  key += right;
153  }
154  switch(type)
155  {
156  case eCache_Blob :
157  key += "_VcfBlob";
158  break;
159  case eCache_Sync :
160  key += "_VcfSync";
161  break;
162  }
163  return key;
164 }
165 
167 {
168  try {
169  LOG_POST(Info << "x_StartHeartbeat");
170  m_StopHeartbeat = false;
171  m_Heartbeat.reset(new thread(&CDataCrawlerCache::x_Heartbeat, this));
172  }
173  catch(const std::system_error& e) {
174  LOG_POST(Error << "Failed to start the cache heartbeat thread: " << e.what());
175  NCBI_THROW(CException, eUnknown, "Failed to start the heartbeat thread.");
176  }
177 }
178 
180 {
181  if (!m_Heartbeat)
182  return;
183  LOG_POST(Info << "x_StopHeartbeat");
184  try {
185  m_StopHeartbeat = true;
186  m_Heartbeat->join();
187  m_Heartbeat.reset(nullptr);
188  }
189  catch(const std::system_error& e) {
190  LOG_POST(Error << "Failed to join the cache heartbeat thread: " << e.what());
191  }
192 }
193 
195 {
196  try {
197  while(!m_StopHeartbeat) {
198  this_thread::sleep_for(chrono::seconds(2));
200  }
201  }
202  catch(const CException& e) {
203  LOG_POST(Error << "x_Heartbeat failed: " << e);
204  }
205 }
206 
207 CNetICacheClient CDataCrawlerCache::x_InitICache(const string &cache, const string &service, const string &db)
208 {
210  string cache_svc = reg.GetString(cache, "service", service);
211  string cache_name = reg.GetString(cache, "cache", db);
212  LOG_POST(Info << "x_InitICache, service: " << cache_svc << ", cache: " << cache_name);
213  return CNetICacheClient(cache_svc, cache_name, "sviewer");
214 }
215 
217 {
218  std::lock_guard<std::mutex> guard(m_SyncCacheMutex);
219  underlying_type_t< ECacheState > result { eState_None };
220  m_SyncCache.Read(m_SyncKey, 0, "", &result, sizeof(result));
221  return static_cast< ECacheState >( result );
222 }
223 
224 inline void CDataCrawlerCache::x_WriteState(ECacheState state, unsigned int ttl)
225 {
226  std::lock_guard<std::mutex> guard(m_SyncCacheMutex);
227  underlying_type_t< ECacheState > st { state };
228  m_SyncCache.Store(m_SyncKey, 0, "", &st, sizeof(st), ttl);
229 }
230 
232 {
235 }
236 
238 {
239  x_StopHeartbeat();
240 }
241 
virtual CNetICacheClient x_InitBlobCache()=0
Creates the BLOB NetCache instance.
ECacheState SetState(ECacheState desired_state)
Changes the state of the cache object.
void x_WriteState(ECacheState state, unsigned int ttl=300)
Writes a new state to the sync object.
const std::string GetCompoundKey() const
Returns the compound NetCache key of the BLOB.
virtual ~CDataCrawlerCache()
Destructor.
void x_StopHeartbeat()
Stops the heartbeat (thread to keep the sybc object alive)
ECacheState
Cache object states.
@ eState_Created
The remote file is not accessible.
@ eState_NotAccessible
The remote file is being processed.
@ eState_InProgress
No cache data.
ECacheType
Types of cache objects.
void x_StartHeartbeat()
Starts the heartbeat (thread to keep the sybc object alive)
std::string m_BlobKey
The NetCache key of the BLOB object.
size_t GetReadSize() const
Returns size of existing BLOB data.
CNetICacheClient m_SyncCache
Sync objects NetCache instance.
void Remove()
Removes the cache entry.
void x_Initialize()
Initializes the cache instances.
bool m_isImmutableKey
do not try to shorten or otherwise modify the root cache key
void x_Heartbeat()
The heartbeat thread function.
CNetICacheClient x_InitICache(const std::string &cache, const std::string &service, const std::string &db)
Creates a NetCache instance.
virtual CNetICacheClient x_InitSyncCache()=0
Creates the sync NetCache instance.
std::unique_ptr< CNcbiOstream > GetWriteStream()
Returns stream interface to write BLOB data.
std::unique_ptr< std::thread > m_Heartbeat
The heartbeat thread.
std::atomic_bool m_StopHeartbeat
Boolean flag to stop the heartbeat thread.
std::mutex m_SyncCacheMutex
Mutex, controlling the access to the sync cache.
CNetICacheClient m_BlobCache
Blobs NetCache instance.
std::unique_ptr< CNcbiIstream > GetReadStream()
Returns stream interface to read BLOB data.
ECacheState x_ReadState() const
Reads the state of the sync object.
ECacheState GetState() const
Gets the state of the cache object.
virtual std::string x_GetCacheKey()=0
Generates the cache key.
std::string m_SyncKey
The NetCache key of the sync object.
Definition: md5.hpp:46
static CNcbiApplication * Instance(void)
Singleton method.
Definition: ncbiapp.cpp:264
CNcbiRegistry –.
Definition: ncbireg.hpp:913
Client to NetCache server (implements ICache interface)
const string & GetServiceName() const
static string Create(const string &service_name, const string &cache_name, const string &key, const string &subkey, const TVersion &version=null)
Note about the "buf_size" parameter for streams in this API.
Definition: rwstream.hpp:122
@ fOwnReader
Own the underlying reader.
Definition: rwstreambuf.hpp:66
@ fOwnWriter
Own the underlying writer.
Definition: rwstreambuf.hpp:67
Writer-based output stream.
Definition: rwstream.hpp:171
@ fBestReliability
Usually, it's not a problem if something fails to get cached sometimes.
Definition: icache.hpp:85
static void md5(const char *src, const char *out)
Definition: challenge.c:77
const CNcbiRegistry & GetConfig(void) const
Get the application's cached configuration parameters (read-only).
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
Definition: ncbidiag.hpp:226
void Error(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1197
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Info(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1185
CNetService GetService()
IReader * GetReadStream(const string &key, int version, const string &subkey, size_t *blob_size_ptr, const CNamedParameterList *optional=NULL)
Read a lengthy blob via the IReader interface.
virtual string GetCacheName(void) const
virtual void Store(const string &key, int version, const string &subkey, const void *data, size_t size, unsigned int time_to_live=0, const string &owner=kEmptyStr)
Add or replace BLOB.
virtual void Remove(const string &key, int version, const string &subkey)
Remove specific cache entry.
virtual bool Read(const string &key, int version, const string &subkey, void *buf, size_t buf_size)
bool HasBlob(const string &key, const string &subkey, const CNamedParameterList *optional=NULL)
virtual IWriter * GetWriteStream(const string &key, int version, const string &subkey, unsigned int time_to_live=0, const string &owner=kEmptyStr)
Return sequential stream interface to write BLOB data.
virtual void SetFlags(TFlags flags)
Pass flags to the underlying storage.
virtual size_t GetSize(const string &key, int version, const string &subkey)
Check if BLOB exists, return BLOB size.
virtual string GetString(const string &section, const string &name, const string &default_value, TFlags flags=0) const
Get the parameter string value.
Definition: ncbireg.cpp:321
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
CMD5 - class for computing Message Digest version 5 checksums.
static int version
Definition: mdb_load.c:29
const struct ncbi::grid::netcache::search::fields::KEY key
Defines the CNcbiApplication and CAppException classes for creating NCBI applications.
Process information in the NCBI Registry, including working with configuration files.
Reader-writer based streams.
Definition: type.c:6
else result
Definition: token2.c:20
Modified on Mon Jun 17 05:07:41 2024 by modify_doxy.py rev. 669887