NCBI C++ ToolKit
sqlite_cache.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: sqlite_cache.cpp 47392 2023-03-06 19:00:01Z evgeniev $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Mike DiCuccio
27  *
28  * File Description: SQLITE3 based ICache interface
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 #include <corelib/ncbistr.hpp>
34 #include <corelib/ncbimtx.hpp>
35 #include <corelib/ncbitime.hpp>
36 #include <corelib/ncbifile.hpp>
37 #include <util/simple_buffer.hpp>
38 
40 
41 #include <sqlite3.h>
42 
43 
45 
46 
47 /////////////////////////////////////////////////////////////////////////////
48 
51  : m_StopRequest(new SWriteRequest),
52  m_Cache(cache),
53  m_WriteQueue(write_q)
54 {
55 }
56 
58 {
59  LOG_POST(Info << "CSQLITE3_Cache::CWriterThread: shutting down writer thread...");
60  m_WriteQueue.Push(m_StopRequest);
61 }
62 
63 
65 {
66  m_Cache.Purge(m_Cache.GetTimeout());
67 
68  bool vac_ok = m_Cache.Vacuum();
69  if (!vac_ok) {
70  // re-open db with file removal
71  LOG_POST("SQLLite Vacuum failed use recovery procedure");
72  m_Cache.Open(m_Cache.GetDatabase(), true);
73  }
74 
75 
76  bool done = false;
77  while ( !done ) {
79  try {
80  req = m_WriteQueue.Pop();
81  }
82  catch (CException& ex) {
83  LOG_POST(Error << ex);
84  }
85 
86  if ( !req ) {
87  continue;
88  }
89 
90  if (m_StopRequest == req) {
91  done = true;
92  continue;
93  }
94 
95  //
96  // write it!
97  //
98  m_Cache.StoreSynchronous(req->key, req->version, req->subkey,
99  req->buffer.data(), req->buffer.size());
100 /*
101 #ifdef _DEBUG
102  size_t size = m_WriteQueue.GetSize();
103  if (size > 0) {
104  _TRACE("CSQLITE3_Cache::CWriterThread::Main(): "
105  << size << " requests pending...");
106  }
107 #endif
108 */
109  }
110  LOG_POST(Info << "CSQLITE3_Cache::CWriterThread: writer thread stopped");
111  return NULL;
112 }
113 
114 
115 /////////////////////////////////////////////////////////////////////////////
116 
117 struct SStats {
121 };
122 
124 
125 /// Add BLOB key specific where condition
126 static
127 void s_MakeKeyCondition(const string& key,
128  int version,
129  const string& subkey,
130  string* out_str)
131 {
132  *out_str += " key = ";
133  *out_str += "'" + key + "'";
134  *out_str += " AND version = ";
135  *out_str += NStr::IntToString(version);
136  *out_str += " AND subkey = ";
137  *out_str += "'" + subkey + "'";
138 }
139 
140 ///////////////////////////////////////////////////////////////////////////////
141 
143 {
144 public:
145  /// compile a statement
146  CSQLITE3_Statement(sqlite3* db, const string& sql);
148 
149  sqlite3_stmt* GetStatement();
150  void ForgetStatement() { m_Stmt = nullptr; }
151 
152  void Bind(int col_or_id, const void* data, size_t size);
153  void Bind(int col_or_id, const string& val);
154  void Bind(int col_or_id, int val);
155 
156  bool Execute();
157  void Reset();
158  int Step();
159 
160  /// return / extract an integer value for a column
161  int GetInt(int col);
163 
164 private:
165  sqlite3* m_DB;
166  sqlite3_stmt* m_Stmt;
167  string m_Sql;
168 
169  void x_Log(int ret, const string& msg)
170  {
171  LOG_POST(Error << msg << ": [" << ret << "] "
172  << sqlite3_errmsg(m_DB));
173  }
174 
175  void x_Throw(int ret, const string& msg)
176  {
177  CNcbiOstrstream ostr;
178  ostr << msg << ": [" << ret << "] "
179  << sqlite3_errmsg(m_DB);
180  string s = string(CNcbiOstrstreamToString(ostr));
182  }
183 
184 private:
187 };
188 
189 
190 CSQLITE3_Statement::CSQLITE3_Statement(sqlite3* db, const string& sql)
191 : m_DB(db), m_Stmt(NULL), m_Sql(sql)
192 {
193  _TRACE("sql: " << sql);
194  int ret = 0;
195 #if (SQLITE_VERSION_NUMBER > 3005001)
196  if ( (ret = sqlite3_prepare_v2(m_DB, sql.c_str(), -1,
197  &m_Stmt, NULL)) != SQLITE_OK) {
198 #else
199  if ( (ret = sqlite3_prepare(m_DB, sql.c_str(), -1,
200  &m_Stmt, NULL)) != SQLITE_OK) {
201 #endif
202  m_Stmt = NULL;
203  x_Throw(ret, "error preparing statement for \"" + sql + "\"");
204  }
205  //_TRACE("exec sql: " << sql);
206 }
207 
209 {
210  if (m_Stmt) {
211  sqlite3_finalize(m_Stmt);
212  }
213 }
214 
216 {
217  return m_Stmt;
218 }
219 
221 {
222  return sqlite3_column_int(m_Stmt, col);
223 }
224 
226 {
227  i = GetInt(0);
228  return *this;
229 }
230 
231 void CSQLITE3_Statement::Bind(int col_or_id, const void* data, size_t size)
232 {
233  _ASSERT(m_Stmt);
234  int ret = 0;
235  if ( (ret = sqlite3_bind_blob(m_Stmt, col_or_id,
236  data, static_cast<int>(size), NULL)) != SQLITE_OK) {
237  x_Throw(ret, "error binding blob");
238  }
239 }
240 
241 void CSQLITE3_Statement::Bind(int col_or_id, const string& val)
242 {
243  _ASSERT(m_Stmt);
244  int ret = 0;
245  if ( (ret = sqlite3_bind_text(m_Stmt, col_or_id,
246  val.data(), static_cast<int>(val.size()), NULL)) != SQLITE_OK) {
247  x_Throw(ret, "error binding string");
248  }
249 }
250 
251 void CSQLITE3_Statement::Bind(int col_or_id, int val)
252 {
253  _ASSERT(m_Stmt);
254  int ret = 0;
255  if ( (ret = sqlite3_bind_int(m_Stmt, col_or_id, val)) != SQLITE_OK) {
256  x_Throw(ret, "error binding int");
257  }
258 }
259 
261 {
262  size_t count = 0;
263  for (;; ++count) {
264  int ret = 0;
265  switch ( (ret = sqlite3_step(m_Stmt)) ) {
266  case SQLITE_ROW:
267  break;
268 
269  case SQLITE_DONE:
270  return true;
271 
272  default:
273  return false;
274  }
275  }
276 
277  return true;
278 }
279 
281 {
282  return sqlite3_step(m_Stmt);
283 }
284 
286 {
287  sqlite3_reset(m_Stmt);
288 #if (SQLITE_VERSION_NUMBER > 3005001)
289  sqlite3_clear_bindings(m_Stmt);
290 #endif
291 }
292 
293 
294 //////////////////////////////////////////////////////////////////////////////
295 
297 {
298  /// local IReader implementation
299  class CBlobReader : public IReader
300  {
301  public:
302  CBlobReader(const unsigned char* buf, size_t size)
303  : m_Pos(0)
304  {
305  m_Buf.resize(size);
306  memcpy(m_Buf.data(), buf, size);
307  }
308 
309  virtual ~CBlobReader()
310  {
311  /**
312  _TRACE(NCBI_CURRENT_FUNCTION << ": read "
313  << m_Pos << "/" << m_Buf.size() << " bytes");
314  **/
315  }
316 
317  ERW_Result Read(void* buf,
318  size_t count,
319  size_t* bytes_read = 0)
320  {
321  if ( !bytes_read ) {
322  return eRW_Success;
323  }
324 
325  count = min(count, m_Buf.size() - m_Pos);
326  *bytes_read = count;
327  if (count) {
328  memcpy(buf, &m_Buf[m_Pos], count);
329  m_Pos += count;
330  return eRW_Success;
331  } else {
332  return eRW_Eof;
333  }
334  }
335 
336  ERW_Result PendingCount(size_t* count)
337  {
338  *count = m_Buf.size() - m_Pos;
339  return eRW_Success;
340  }
341 
342  private:
343  CSimpleBuffer m_Buf;
344  size_t m_Pos;
345  };
346 
347  ///
348  /// retrieve our row and create a blob reader
349  /// FIXME: use incremental I/O in the future
350  ///
351 
352  CStopWatch sw;
353  sw.Start();
354 
355  unique_ptr<IReader> reader;
356  int size = sqlite3_column_bytes(stmt.GetStatement(), col);
357  const void* data = sqlite3_column_blob(stmt.GetStatement(), col);
358  if (data) {
359  reader.reset(new CBlobReader((const unsigned char*)data, size));
362  }
363  double e = sw.Elapsed();
364  s_CacheStats.total_time.Add((int)(e * 1000));
365  //LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(): read = " << sw.Elapsed() * 1000 << " msec");
366 
367  return reader.release();
368 }
369 
370 
371 ///////////////////////////////////////////////////////////////////////////////
373 {
374  switch (GetErrCode())
375  {
376  case eUnknown: return "eUnknown";
377  case eInitError: return "eInitError";
378  case eNotImplemented: return "eNotImplemented";
379  default: return CException::GetErrCodeString();
380  }
381 }
382 
383 ///////////////////////////////////////////////////////////////////////////////
384 
386  : m_Timeout(7 * (24*60*60))
387  , m_TimeStampFlag(kDefaultTimestampPolicy)
388  , m_VersionFlag(eKeepAll)
389  , m_DB(NULL)
390 {
391 // m_WriterThread.Reset(new CWriterThread(*this, m_WriteQueue));
392 // m_WriterThread->Run();
393 }
394 
396 {
398 
399  // stop the write thread and queue
400  if (!m_WriterThread.IsNull()) {
401  m_WriterThread->Stop();
402  m_WriterThread->Join();
403  }
404 
405  if (m_DB) {
406 
407  if (m_Stmt_Store)
408  m_Stmt_Store->ForgetStatement();
410  m_Stmt_HasBlobs_key->ForgetStatement();
412  m_Stmt_HasBlobs_key_subkey->ForgetStatement();
414  m_Stmt_GetBlobAccess->ForgetStatement();
416  m_Stmt_GetReadStream->ForgetStatement();
418  m_Stmt_SetTimestamp->ForgetStatement();
419 
420 
421  // close all pending statements
422  size_t count = 0;
423  sqlite3_stmt *stmt = NULL;
424  while ( (stmt = sqlite3_next_stmt(m_DB, 0))!=0 ){
425  sqlite3_finalize(stmt);
426  ++count;
427  }
428  if (count) {
429  LOG_POST(Warning << "CSQLITE3_Cache::~CSQLITE3_Cache(): flushed "
430  << count << " pending statements");
431  } else {
432  _TRACE("CSQLITE3_Cache::~CSQLITE3_Cache(): no pending statements");
433  }
434 
435 
436  // close the database
437  int ret = sqlite3_close(m_DB);
438  m_DB = NULL;
439  if (ret != SQLITE_OK) {
440  _ASSERT(ret != SQLITE_BUSY);
441  LOG_POST(Error << "CSQLITE3_Cache::~CSQLITE3_Cache(): "
442  "error closing database '" << m_Database << "'");
443  }
444  }
445 
446  size_t items = s_CacheStats.objects_read.Get();
447  size_t bytes = s_CacheStats.bytes_read.Get();
448  size_t msec = s_CacheStats.total_time.Get();
449 
450  if (items) {
451  LOG_POST(Info << "CSQLITE3_Cache::~CSQLITE3_Cache(): read "
452  << items << " items / "
453  << bytes << " bytes / "
454  << msec << " msec / "
455  << bytes / double(items) << " bytes/item / "
456  << msec / double(items) << " msec/item / "
457  );
458  }
459 }
460 
462 {
463  CMutexGuard guard(m_Mutex);
464 
465  CSQLITE3_Statement stmt(m_DB, "VACUUM");
466  if (!stmt.Execute() ) {
467  LOG_POST(Warning << "Failed to vacuum the sqllite3 database:" << m_Database);
468  return false;
469  }
470  return true;
471 }
472 
473 
474 void CSQLITE3_Cache::Open(const string& database, bool remove)
475 {
476  CStopWatch sw;
477  sw.Start();
478 
479  LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(" << database << ")");
480 
481 
482 
483  //
484  // open the database
486 
487  if (m_DB) {
488  /*int ret = */sqlite3_close(m_DB);
489  m_DB = 0;
490  }
491 
492  if (remove) {
493  CDirEntry de(m_Database);
494  de.Remove();
495  }
496 
497  // check if target dir is present
498  //
499  {
500  string dir;
502 
503  CDir de(dir);
504  if (!de.Exists()) {
505  de.Create();
506  }
507  }
508 
509 
510  #if (SQLITE_VERSION_NUMBER > 3005001)
511  int ret = sqlite3_open_v2(m_Database.c_str(), &m_DB,
512  SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE,
513  NULL);
514  #else
515  int ret = sqlite3_open(m_Database.c_str(), &m_DB);
516  #endif
517  if (ret != SQLITE_OK) {
518  string msg("error opening database '");
519  msg += m_Database;
520  msg += "': ";
521  msg += sqlite3_errmsg(m_DB);
522 
523  try {
524  CDirEntry de(m_Database);
525  de.Remove();
526  }
527  catch (std::exception& e)
528  {
529  LOG_POST(Error << "Attempt to delete damaged DB failed: " << e.what());
530  }
531 
533  }
534 
535 
536 
537  //
538  // standard db connection setup fior SQLite
539  //
540  if ( !CSQLITE3_Statement(m_DB, "PRAGMA journal_mode = OFF").Execute() ) {
542  "failed to set journaling mode");
543  }
544 
545  if ( !CSQLITE3_Statement(m_DB, "PRAGMA temp_store = MEMORY").Execute() ) {
547  "failed to set temp store");
548  }
549 
550  if ( !CSQLITE3_Statement(m_DB, "PRAGMA page_size = 32768").Execute() ) {
552  "failed to set page_size");
553  }
554 
555  // synchronous = OFF gives major speed difference when we update timestamps
556  //
557  if ( !CSQLITE3_Statement(m_DB, "PRAGMA synchronous = OFF").Execute() ) {
559  "failed to set synchronous mode");
560  }
561  if ( !CSQLITE3_Statement(m_DB, "PRAGMA count_changes = OFF").Execute() ) {
563  "failed to disable count chnages mode");
564  }
565 
566 
567  //
568  // we now have an open connection
569  // we can try to purge some data
570  //
571  if (CSQLITE3_Statement(m_DB, "PRAGMA table_info(CacheBlobs)").Step() == SQLITE_ROW) {
572  } else {
573  // create our table
575  (m_DB,
576  "CREATE TABLE CacheBlobs " \
577  "(" \
578  " key varchar(256) NOT NULL, " \
579  " version int NOT NULL, " \
580  " subkey varchar(256) NOT NULL, " \
581  "" \
582  " timestamp int NOT NULL, " \
583  " data blob NULL " \
584  ")");
585  if ( !stmt.Execute() ) {
587  "failed to initialize cache");
588  }
589  }
590 
591  // check our indices as well
592  // some earlier versions were created without indices
593  if (CSQLITE3_Statement(m_DB, "PRAGMA index_info(CacheBlobs_pk)").Step() != SQLITE_ROW) {
595  (m_DB,
596  "CREATE UNIQUE INDEX CacheBlobs_pk ON CacheBlobs(key, version, subkey)");
597  if ( !stmt.Execute() ) {
599  "failed to initialize cache: failed to create PK index");
600  }
601  }
602 
603  if (CSQLITE3_Statement(m_DB, "PRAGMA index_info(CacheBlobs_timestamp)").Step() != SQLITE_ROW) {
605  (m_DB,
606  "CREATE INDEX CacheBlobs_timestamp ON CacheBlobs(timestamp)");
607  if ( !stmt.Execute() ) {
609  "failed to initialize cache: failed to create timestamp index");
610  }
611  }
612 
613  // start background writer (if it is not started yet)
614  if (m_WriterThread.IsNull()) {
615  m_WriterThread.Reset(new CWriterThread(*this, m_WriteQueue));
616  m_WriterThread->Run();
617  }
618 
619  LOG_POST(Info << "CSQLITE3_Cache::Open(): " << sw.Elapsed() << " seconds");
620 }
621 
622 
624 {
625  //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
626  return (m_DB != NULL);
627 }
628 
629 
631 {
632  //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
633  return m_Database;
634 }
635 
636 
638  unsigned int timeout,
639  unsigned int max_timeout)
640 {
641  //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
642  CMutexGuard guard(m_Mutex);
643  if (policy) {
644  m_TimeStampFlag = policy;
645  } else {
647  }
648 
649  m_Timeout = timeout;
650 }
651 
652 
654 {
655  //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
656  return m_TimeStampFlag;
657 }
658 
659 
661 {
662  //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
663  return m_Timeout;
664 }
665 
666 
668 {
669  //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
670  m_VersionFlag = policy;
671 }
672 
673 
675 {
676  //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
677  return m_VersionFlag;
678 }
679 
680 
682  int version,
683  const string& subkey)
684 {
685  CMutexGuard LOCK(m_Mutex);
686  time_t time = CTime(CTime::eCurrent).GetTimeT();
687 
688  if ( !m_Stmt_SetTimestamp.get() ) {
689  string sql =
690  "UPDATE CacheBlobs SET timestamp = ?1 WHERE "
691  "key = ?2 AND version = ?3 AND subkey = ?4";
692  //s_MakeKeyCondition(key, version, subkey, &sql);
694  } else {
695  m_Stmt_SetTimestamp->Reset();
696  }
697 
698  //CSQLITE3_Statement stmt2(m_DB, sql);
699  m_Stmt_SetTimestamp->Bind(1, int(time));
700  m_Stmt_SetTimestamp->Bind(2, key);
701  m_Stmt_SetTimestamp->Bind(3, version);
702  m_Stmt_SetTimestamp->Bind(4, subkey);
703  if ( !m_Stmt_SetTimestamp->Execute() ) {
704  LOG_POST(Error << "failed to update timestamp on cache blob: "
705  << "\"" << key << "\", " << version << ", \""
706  << subkey << "\": " << sqlite3_errmsg(m_DB));
707  } else {
708  /**
709  LOG_POST(Info
710  << "\"" << key << "\", " << version << ", \""
711  << subkey << "\": timestamp=" << time);
712  **/
713  }
714 }
715 
716 void CSQLITE3_Cache::Store(const string& key,
717  int version,
718  const string& subkey,
719  const void* data,
720  size_t size,
721  unsigned int time_to_live,
722  const string& owner)
723 {
724  //
725  // prepare our write reque
727  req->key = key;
728  req->version = version;
729  req->subkey = subkey;
730  req->buffer.resize(size);
731  memcpy(req->buffer.data(), data, size);
732 
733  // push to our queue
734  m_WriteQueue.Push(req);
735 }
736 
737 
738 
740  int version,
741  const string& subkey,
742  const void* data,
743  size_t size)
744 {
745  CMutexGuard LOCK(m_Mutex);
746 
747  /**
748  LOG_POST(Info << NCBI_CURRENT_FUNCTION
749  << "(\"" << key << "\", " << version << ", \""
750  << subkey << "\", " << data << ", " << size << "): thread="
751  << CThread::GetSelf());
752  **/
753  _ASSERT(m_DB);
754 
756  Purge(key, subkey, 0);
757  }
758 
759  string sql;
760  int ret = 0;
761  time_t timestamp = CTime(CTime::eCurrent).GetTimeT();
762 
763  //
764  // insert the row into cache data
765  // we scan first to see if the row already exists
766  //
767  if ( !m_Stmt_Store.get() ) {
769  "INSERT OR REPLACE INTO CacheBlobs (key, version, subkey, timestamp, data) "
770  "VALUES( ?1, ?2, ?3, ?4, ?5 )"));
771  }
772 
773  m_Stmt_Store->Reset();
774  m_Stmt_Store->Bind(1, key);
775  m_Stmt_Store->Bind(2, version);
776  m_Stmt_Store->Bind(3, subkey);
777  m_Stmt_Store->Bind(4, int(timestamp));
778  m_Stmt_Store->Bind(5, data, size);
779 
780  // execute...
781  if ( (ret = m_Stmt_Store->Step()) != SQLITE_DONE) {
782  LOG_POST(Error << "failed to write " << size << " bytes: "
783  << sql << ": [" << ret << "] " << sqlite3_errmsg(m_DB));
784  }
785 }
786 
787 
788 size_t CSQLITE3_Cache::GetSize(const string& key,
789  int version,
790  const string& subkey)
791 {
793 
794  string sql = "SELECT data FROM CacheBlobs WHERE ";
797  if (stmt.Step() == SQLITE_ROW) {
798  return sqlite3_column_bytes(stmt.GetStatement(), 0);
799  }
800 
801  return 0;
802 }
803 
804 
805 bool CSQLITE3_Cache::Read(const string& key,
806  int version,
807  const string& subkey,
808  void* buf,
809  size_t buf_size)
810 {
812 
813  string sql = "SELECT data FROM CacheBlobs WHERE ";
815 
817  if (stmt.Step() == SQLITE_ROW) {
818  size_t size = sqlite3_column_bytes(stmt.GetStatement(), 0);
819  size = min(size, buf_size);
820  memcpy(buf,
821  sqlite3_column_blob(stmt.GetStatement(), 0),
822  size);
823 
824  /// set timestamp
827  }
828  return true;
829  }
830 
831  return false;
832 }
833 
834 
836  int version,
837  const string& subkey)
838 {
839  CMutexGuard LOCK(m_Mutex);
840  //LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(" << key << ", " << version << ", " << subkey << ")");
841 
842  //
843  // retrieve our row and create a blob reader
844  // FIXME: use incremental I/O in the future
845  //
846  if ( !m_Stmt_GetReadStream.get() ) {
847  string sql =
848  "SELECT data FROM CacheBlobs WHERE "
849  "key = ?1 AND version = ?2 AND subkey = ?3"
850  ;
852  } else {
853  m_Stmt_GetReadStream->Reset();
854  }
855  //s_MakeKeyCondition(key, version, subkey, &sql);
856  //CSQLITE3_Statement stmt(m_DB, sql);
857  m_Stmt_GetReadStream->Bind(1, key);
858  m_Stmt_GetReadStream->Bind(2, version);
859  m_Stmt_GetReadStream->Bind(3, subkey);
860  if (m_Stmt_GetReadStream->Step() == SQLITE_ROW) {
861  unique_ptr<IReader> reader(GetBlobReader(*m_Stmt_GetReadStream, 0));
862 
863  /// set timestamp
866  }
867  return reader.release();
868  }
869 
870  return NULL;
871 }
872 
874  const string& /*key*/,
875  const string& /*subkey*/,
876  int* /*version*/,
877  EBlobVersionValidity* /*validity*/)
878 {
879  // ICache last valid version protocol is not implemented in GBench
880  // (this is ok it is optimization only important for network cache)
881  NCBI_THROW(CSQLITE3_ICacheException, eNotImplemented,
882  "CSQLITE3_Cache::GetReadStream(key, subkey, version, validity) "
883  "is not implemented");
884 }
885 
886 void CSQLITE3_Cache::SetBlobVersionAsValid(const string& /* key */,
887  const string& /* subkey */,
888  int /* version */)
889 {
890  // ICache last valid version protocol is not implemented in GBench
891  // (this is ok it is optimization only important for network cache)
892  NCBI_THROW(CSQLITE3_ICacheException, eNotImplemented,
893  "CSQLITE3_Cache::SetBlobVersionAsValid(key, subkey, version) "
894  "is not implemented");
895 }
896 
897 
899  int version,
900  const string& subkey,
901  unsigned int time_to_live,
902  const string& owner)
903 {
904  //LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(" << key << ", " << version << ", " << subkey << ", " << time_to_live << ", " << owner << ")");
905 
906  class CBlobWriter : public IWriter
907  {
908  public:
910  const string& key,
911  int version,
912  const string& subkey)
913  : m_Cache(cache)
914  , m_Key(key)
915  , m_Version(version)
916  , m_Subkey(subkey)
917  , m_Flushed(false)
918  {
919  }
920 
921  ~CBlobWriter()
922  {
923  if ( !m_Flushed ) {
924  Flush();
925  }
926  }
927 
928  ERW_Result Write(const void* buf,
929  size_t count,
930  size_t* bytes_written = 0)
931  {
932  if ( !m_Flushed ) {
933  m_Data.insert(m_Data.end(),
934  (const unsigned char*)(buf),
935  (const unsigned char*)(buf) + count);
936  if (bytes_written) {
937  *bytes_written = count;
938  }
939  return eRW_Success;
940  } else {
941  return eRW_Error;
942  }
943  }
944 
945  ERW_Result Flush()
946  {
947  if (m_Data.size()) {
948  m_Cache.Store(m_Key, m_Version, m_Subkey,
949  &m_Data[0], m_Data.size());
950  //LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(): wrote " << m_Data.size() << " bytes");
951  }
952  m_Flushed = true;
953  return eRW_Success;
954  }
955 
956  private:
957  CSQLITE3_Cache& m_Cache;
958  string m_Key;
959  int m_Version;
960  string m_Subkey;
961  vector<unsigned char> m_Data;
962  bool m_Flushed;
963  };
964 
965  return new CBlobWriter(*this, key, version, subkey);
966 }
967 
968 
969 void CSQLITE3_Cache::Remove(const string& key)
970 {
971  //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
972 
973  // create transaction
974  string sql = "DELETE FROM CacheBlobs WHERE key = '";
975  sql += key;
976  sql += "'";
978  stmt.Execute();
979 }
980 
981 
982 void CSQLITE3_Cache::Remove(const string& key,
983  int version,
984  const string& subkey)
985 {
986  //LOG_POST(Info << NCBI_CURRENT_FUNCTION);
987 
988  /// create transaction
989  string sql = "DELETE FROM CacheBlobs WHERE ";
992  stmt.Execute();
993 }
994 
995 
996 time_t CSQLITE3_Cache::GetAccessTime(const string& key,
997  int version,
998  const string& subkey)
999 {
1001 
1002  string sql = "SELECT timestamp FROM CacheBlobs WHERE ";
1005  if (stmt.Step() == SQLITE_ROW) {
1006  return stmt.GetInt(0);
1007  }
1008 
1009  return 0;
1010 }
1011 
1012 
1013 void CSQLITE3_Cache::Purge(time_t access_timeout)
1014 {
1016 
1017  if (access_timeout == 0) {
1018  //x_TruncateDB();
1019  return;
1020  }
1021 
1022  CTime time_stamp(CTime::eCurrent);
1023  time_t curr = time_stamp.GetTimeT();
1024  int timeout = GetTimeout();
1025  curr -= timeout;
1026 
1027  string sql = "DELETE FROM CacheBlobs WHERE timestamp < ?1";
1029  stmt.Bind(1, int(curr));
1030  if (stmt.Step() == SQLITE_DONE) {
1031  int count = sqlite3_changes(m_DB);
1032  LOG_POST(Info << "CSQLITE3_Cache::Purge(): "
1033  << count << " items purged");
1034  }
1035 }
1036 
1037 
1038 void CSQLITE3_Cache::Purge(const string& key,
1039  const string& subkey,
1040  time_t access_timeout)
1041 {
1043 
1044  if (access_timeout == 0) {
1045  //x_TruncateDB();
1046  return;
1047  }
1048 
1049  CTime time_stamp(CTime::eCurrent);
1050  time_t curr = time_stamp.GetTimeT();
1051  int timeout = GetTimeout();
1052  curr -= timeout;
1053 
1054  string sql =
1055  "DELETE FROM CacheBlobs WHERE "
1056  " timestamp < ?1 ";
1057 
1058  if (!key.empty()) {
1059  sql += " AND key = '";
1060  sql += key;
1061  sql += "'";
1062  }
1063 
1064  if (!subkey.empty()) {
1065  sql += " AND subkey = '";
1066  sql += subkey;
1067  sql += "'";
1068  }
1070  stmt.Bind(1, int(curr));
1071  if (stmt.Step() == SQLITE_DONE) {
1072  int count = sqlite3_changes(m_DB);
1073  LOG_POST(Info << "CSQLITE3_Cache::Purge(): "
1074  << count << " items purged");
1075  }
1076 }
1077 
1078 
1080 {
1081  if ( !params ) {
1082  return false;
1083  }
1084  const TCacheParams* driver = params->FindNode("driver");
1085  if (!driver || driver->GetValue().value != kSQLITE3_BlobCacheDriverName) {
1086  return false;
1087  }
1088  const TCacheParams* driver_params =
1090  if ( !driver_params ) {
1091  return false;
1092  }
1093  const TCacheParams* path = driver_params->FindNode("database");
1094  if (!path) {
1095  return false;
1096  }
1097  const string& database = path->GetValue().value;
1098 
1099  string base1, base2;
1100  CDirEntry::SplitPath(database, 0, &base1, 0);
1101  CDirEntry::SplitPath(m_Database, 0, &base2, 0);
1102 
1103  if (base1 == base2)
1104  return true;
1105  return false;
1106 }
1107 
1108 
1109 bool CSQLITE3_Cache::HasBlobs(const string& key,
1110  const string& subkey)
1111 {
1112  CMutexGuard LOCK(m_Mutex);
1113  //LOG_POST(Info << NCBI_CURRENT_FUNCTION << "(\"" << key << "\", \"" << subkey << "\")");
1114 
1116  if ( !m_Stmt_HasBlobs_key.get() ) {
1117  m_Stmt_HasBlobs_key.reset
1118  (new CSQLITE3_Statement(m_DB,
1119  "SELECT timestamp FROM CacheBlobs WHERE "
1120  "key = ?1"));
1121  }
1122 
1123  if ( !m_Stmt_HasBlobs_key_subkey.get() ) {
1125  (new CSQLITE3_Statement(m_DB,
1126  "SELECT timestamp FROM CacheBlobs WHERE "
1127  "key = ?1 AND subkey = ?2"));
1128  }
1129 
1130  if (subkey.empty()) {
1132  } else {
1134  }
1135 
1136  stmt->Reset();
1137  stmt->Bind(1, key);
1138  if ( !subkey.empty() ) {
1139  stmt->Bind(2, subkey);
1140  }
1141  return (stmt->Step() == SQLITE_ROW);
1142 }
1143 
1144 
1146  int version,
1147  const string& subkey,
1148  SBlobAccessDescr* blob_descr)
1149 {
1150  CMutexGuard LOCK(m_Mutex);
1151  /**
1152  LOG_POST(Info << NCBI_CURRENT_FUNCTION
1153  << "(\"" << key << "\", " << version << ", \"" << subkey << "\"): thread="
1154  << CThread::GetSelf());
1155  **/
1156 
1157  blob_descr->reader.reset();
1158  blob_descr->blob_size = 0;
1159  blob_descr->blob_found = false;
1160 
1161  string sql;
1162  time_t time = CTime(CTime::eCurrent).GetTimeT();
1163  time_t timeout = time - GetTimeout();
1164 
1165  // check to see if the row exists in the cache attributes
1166  // if so, we intend on updating the timestamp to be now
1167  int this_timestamp = 0;
1168 
1169  // we have a blob
1170  if ( !m_Stmt_GetBlobAccess.get() ) {
1171  sql =
1172  "SELECT timestamp, data FROM CacheBlobs WHERE "
1173  "key = ?1 AND version = ?2 AND subkey = ?3";
1174  //s_MakeKeyCondition(key, version, subkey, &sql);
1176  } else {
1177  m_Stmt_GetBlobAccess->Reset();
1178  }
1179 
1180  m_Stmt_GetBlobAccess->Bind(1, key);
1181  m_Stmt_GetBlobAccess->Bind(2, version);
1182  m_Stmt_GetBlobAccess->Bind(3, subkey);
1183  if (m_Stmt_GetBlobAccess->Step() == SQLITE_ROW) {
1184  // read and process our timestamp
1185  this_timestamp = m_Stmt_GetBlobAccess->GetInt(0);
1186  if (this_timestamp < timeout) {
1187  Remove(key, version, subkey);
1188  } else {
1189  // read and process our blob
1190  blob_descr->blob_size =
1191  sqlite3_column_bytes(m_Stmt_GetBlobAccess->GetStatement(), 1);
1192  blob_descr->blob_found = true;
1193 
1194  if (blob_descr->buf &&
1195  blob_descr->buf_size >= blob_descr->blob_size) {
1196  memcpy(blob_descr->buf,
1197  sqlite3_column_blob(m_Stmt_GetBlobAccess->GetStatement(), 1),
1198  blob_descr->blob_size);
1199  //LOG_POST(Info << " direct copy: " << blob_descr->blob_size << " bytes");
1200  } else {
1201  blob_descr->reader.reset(GetBlobReader(*m_Stmt_GetBlobAccess, 1));
1202  /**
1203  LOG_POST(Info << " stream read: " << blob_descr->blob_size
1204  << " bytes / " << blob_descr->buf_size << " bytes in buffer");
1205  **/
1206  }
1207  }
1208 
1209  // set timestamp
1212  }
1213  }
1214 }
1215 
1216 
1217 /// @name unimplemented
1218 /// @{
1220  int version,
1221  const string& subkey,
1222  string* owner)
1223 {
1225  _ASSERT(owner);
1226  owner->erase(); // not supported in this implementation
1227 }
1228 
1229 void CSQLITE3_Cache::SetMemBufferSize(unsigned int buf_size)
1230 {
1231  /// noop
1232 }
1233 
1234 
1235 /// @}
1236 
CAtomicCounter –.
Definition: ncbicntr.hpp:71
CDirEntry –.
Definition: ncbifile.hpp:262
CDir –.
Definition: ncbifile.hpp:1696
CNcbiOstrstreamToString class helps convert CNcbiOstrstream to a string Sample usage:
Definition: ncbistre.hpp:802
CRef –.
Definition: ncbiobj.hpp:618
thread for delayed writes
SQLITE3 cache implementation.
SQLITE3 ICache exception.
sqlite3_stmt * m_Stmt
sqlite3_stmt * GetStatement()
void x_Log(int ret, const string &msg)
void Bind(int col_or_id, const void *data, size_t size)
CSQLITE3_Statement & operator>>(int &i)
int GetInt(int col)
return / extract an integer value for a column
CSQLITE3_Statement(sqlite3 *db, const string &sql)
compile a statement
CSQLITE3_Statement(const CSQLITE3_Statement &)
void x_Throw(int ret, const string &msg)
CSQLITE3_Statement & operator=(const CSQLITE3_Statement &)
Reallocable memory buffer (no memory copy overhead) Mimics vector<>, without the overhead of explicit...
CStopWatch –.
Definition: ncbitime.hpp:1937
void Push(const TValue &elem, const CTimeSpan *timeout=NULL)
Add new element to the end of queue.
TValue Pop(const CTimeSpan *timeout=NULL)
Retrieve an element from the queue.
CTime –.
Definition: ncbitime.hpp:296
definition of a Culling tree
Definition: ncbi_tree.hpp:100
EKeepVersions
If to keep already cached versions of the BLOB when storing another version of it (not necessarily a ...
Definition: icache.hpp:162
@ eDropOlder
Delete the earlier (than the one being stored) versions of the BLOB.
Definition: icache.hpp:167
@ eDropAll
Delete all versions of the BLOB, even those which are newer than the one being stored.
Definition: icache.hpp:170
EBlobVersionValidity
BLOB version existence and validity – from the point of view of the underlying cache implementation.
Definition: icache.hpp:274
@ fTimeStampOnRead
Timestamp is updated every on every access (read or write)
Definition: icache.hpp:110
int TTimeStampFlags
Holds a bitwise OR of "ETimeStampFlags".
Definition: icache.hpp:128
A very basic data-read interface.
A very basic data-write interface.
The NCBI C++ standard methods for dealing with std::string.
static void DLIST_NAME() remove(DLIST_LIST_TYPE *list, DLIST_TYPE *item)
Definition: dlist.tmpl.h:90
static char sql[1024]
Definition: putdata.c:19
static HSTMT stmt
Definition: rebindpar.c:12
char data[12]
Definition: iconv.c:80
CNcbiIstream & operator>>(CNcbiIstream &s, const getcontig &c)
string
Definition: cgiapp.hpp:690
#define NULL
Definition: ncbistd.hpp:225
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
Definition: ncbicntr.hpp:278
TValue Get(void) const THROWS_NONE
Get atomic counter value.
Definition: ncbicntr.hpp:168
CRef< CWriterThread > m_WriterThread
void Stop()
Queue a request to stop the background writer Asyncronous! Thread may not stop yet when it gets back ...
virtual time_t GetAccessTime(const string &key, int version, const string &subkey)
Return last access time for the specified cache entry.
unsigned int m_Timeout
virtual ~CSQLITE3_Cache()
virtual void GetBlobAccess(const string &key, int version, const string &subkey, SBlobAccessDescr *blob_descr)
Get BLOB access using BlobAccessDescr.
virtual void GetBlobOwner(const string &key, int version, const string &subkey, string *owner)
Retrieve BLOB owner.
unique_ptr< CSQLITE3_Statement > m_Stmt_Store
precompiled statements these are used to speed up time-critical accesses
virtual void SetTimeStampPolicy(TTimeStampFlags policy, unsigned int timeout, unsigned int max_timeout=0)
Set timestamp update policy.
virtual IReader * GetReadStream(const string &key, int version, const string &subkey)
Return sequential stream interface to read BLOB data.
bool Vacuum()
Vacuum the database (should be open first)
void * Main()
Derived (user-created) class must provide a real thread function.
unique_ptr< CSQLITE3_Statement > m_Stmt_GetReadStream
virtual bool Read(const string &key, int version, const string &subkey, void *buf, size_t buf_size)
Fetch the BLOB.
void x_SetTimestamp(const string &key, int version, const string &subkey)
virtual TTimeStampFlags GetTimeStampPolicy() const
Get timestamp policy.
virtual EKeepVersions GetVersionRetention() const
Get version retention.
void SetMemBufferSize(unsigned int buf_size)
Set size of the intermidiate BLOB memory buffer.
virtual void Remove(const string &key)
virtual void Store(const string &key, int version, const string &subkey, const void *data, size_t size, unsigned int time_to_live=0, const string &owner=kEmptyStr)
Add or replace BLOB.
virtual void SetVersionRetention(EKeepVersions policy)
Set version retention policy.
string m_Database
filename of the database
void StoreSynchronous(const string &key, int version, const string &subkey, const void *data, size_t size)
TTimeStampFlags m_TimeStampFlag
virtual size_t GetSize(const string &key, int version, const string &subkey)
Check if BLOB exists, return BLOB size.
unique_ptr< CSQLITE3_Statement > m_Stmt_GetBlobAccess
virtual int GetTimeout() const
Get expiration timeout.
virtual void SetBlobVersionAsValid(const string &key, const string &subkey, int version)
virtual bool SameCacheParams(const TCacheParams *params) const
virtual IWriter * GetWriteStream(const string &key, int version, const string &subkey, unsigned int time_to_live=0, const string &owner=kEmptyStr)
Specifics of this IWriter implementation is that IWriter::Flush here cannot be called twice,...
const string kSQLITE3_BlobCacheDriverName
unique_ptr< CSQLITE3_Statement > m_Stmt_HasBlobs_key
unique_ptr< CSQLITE3_Statement > m_Stmt_HasBlobs_key_subkey
TSqliteDb * m_DB
TWriteQueue m_WriteQueue
unique_ptr< CSQLITE3_Statement > m_Stmt_SetTimestamp
virtual void Purge(time_t access_timeout)
Delete all BLOBs older than specified.
virtual string GetCacheName(void) const
CWriterThread(CSQLITE3_Cache &cache, TWriteQueue &write_q)
virtual bool IsOpen() const
virtual bool HasBlobs(const string &key, const string &subkey)
Check if any BLOB exists (any version)
EKeepVersions m_VersionFlag
virtual const char * GetErrCodeString(void) const override
Get error code interpreted as text.
void Open(const string &database, bool remove=false)
Open cache.
#define _TRACE(message)
Definition: ncbidbg.hpp:122
#define NCBI_CURRENT_FUNCTION
Get current function name.
Definition: ncbidiag.hpp:142
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
Definition: ncbidiag.hpp:226
void Error(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1197
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
virtual const char * GetErrCodeString(void) const
Get error code interpreted as text.
Definition: ncbiexpt.cpp:444
void Info(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1185
virtual bool Remove(TRemoveFlags flags=eRecursive) const
Remove a directory entry.
Definition: ncbifile.cpp:2595
virtual bool Exists(void) const
Check if directory "dirname" exists.
Definition: ncbifile.hpp:4066
bool Create(TCreateFlags flags=fCreate_Default) const
Create the directory using "dirname" passed in the constructor.
Definition: ncbifile.cpp:4071
static void SplitPath(const string &path, string *dir=0, string *base=0, string *ext=0)
Split a path string into its basic components.
Definition: ncbifile.cpp:358
void Read(CObjectIStream &in, TObjectPtr object, const CTypeRef &type)
Definition: serial.cpp:60
void Write(CObjectOStream &out, TConstObjectPtr object, const CTypeRef &type)
Definition: serial.cpp:55
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
ERW_Result
Result codes for I/O operations.
@ eRW_Eof
End of data, should be considered permanent.
@ eRW_Error
Unrecoverable error, no retry possible.
@ eRW_Success
Everything is okay, I/O completed.
static string IntToString(int value, TNumToStringFlags flags=0, int base=10)
Convert int to string.
Definition: ncbistr.hpp:5078
double Elapsed(void) const
Return time elapsed since first Start() or last Restart() call (in seconds).
Definition: ncbitime.hpp:2775
time_t GetTimeT(void) const
Get time in time_t format.
Definition: ncbitime.cpp:1396
void Start(void)
Start the timer.
Definition: ncbitime.hpp:2764
@ eCurrent
Use current time. See also CCurrentTime.
Definition: ncbitime.hpp:300
const TTreeType * FindNode(const TKeyType &key, TNodeSearchMode sflag=eImmediateAndTop) const
Search for node.
Definition: ncbi_tree.hpp:970
const TValue & GetValue(void) const
Return node's value.
Definition: ncbi_tree.hpp:184
static CStopWatch sw
use only n Cassandra database for the lookups</td > n</tr > n< tr > n< td > yes</td > n< td > do not use tables BIOSEQ_INFO and BLOB_PROP in the Cassandra database
char * buf
int i
const string version
version string
Definition: variables.hpp:66
string Execute(const string &cmmd, const vector< string > &args, const string &data=kEmptyStr)
const struct ncbi::grid::netcache::search::fields::SIZE size
const struct ncbi::grid::netcache::search::fields::KEY key
const struct ncbi::grid::netcache::search::fields::SUBKEY subkey
Defines classes: CDirEntry, CFile, CDir, CSymLink, CMemoryFile, CFileUtil, CFileLock,...
Multi-threading – mutexes; rw-locks; semaphore.
Defines: CTimeFormat - storage class for time format.
T min(T x_, T y_)
#define count
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
static IReader * GetBlobReader(CSQLITE3_Statement &stmt, int col)
static void s_MakeKeyCondition(const string &key, int version, const string &subkey, string *out_str)
Add BLOB key specific where condition.
static SStats s_CacheStats
delayed write request object
BLOB access descriptor.
Definition: icache.hpp:332
unique_ptr< IReader > reader
Definition: icache.hpp:347
CAtomicCounter total_time
CAtomicCounter objects_read
CAtomicCounter bytes_read
#define _ASSERT
done
Definition: token1.c:1
Modified on Fri Sep 20 14:57:27 2024 by modify_doxy.py rev. 669887