59 LOG_POST(
Info <<
"CSQLITE3_Cache::CWriterThread: shutting down writer thread...");
66 m_Cache.Purge(m_Cache.GetTimeout());
68 bool vac_ok = m_Cache.Vacuum();
71 LOG_POST(
"SQLLite Vacuum failed use recovery procedure");
72 m_Cache.Open(m_Cache.GetDatabase(),
true);
90 if (m_StopRequest == req) {
98 m_Cache.StoreSynchronous(req->key, req->version, req->subkey,
99 req->buffer.data(), req->buffer.size());
110 LOG_POST(
Info <<
"CSQLITE3_Cache::CWriterThread: writer thread stopped");
132 *out_str +=
" key = ";
133 *out_str +=
"'" +
key +
"'";
134 *out_str +=
" AND version = ";
136 *out_str +=
" AND subkey = ";
137 *out_str +=
"'" +
subkey +
"'";
149 sqlite3_stmt* GetStatement();
152 void Bind(
int col_or_id,
const void*
data,
size_t size);
153 void Bind(
int col_or_id,
const string&
val);
154 void Bind(
int col_or_id,
int val);
172 << sqlite3_errmsg(
m_DB));
178 ostr <<
msg <<
": [" << ret <<
"] "
179 << sqlite3_errmsg(
m_DB);
195 #if (SQLITE_VERSION_NUMBER > 3005001)
196 if ( (ret = sqlite3_prepare_v2(
m_DB,
sql.c_str(), -1,
199 if ( (ret = sqlite3_prepare(
m_DB,
sql.c_str(), -1,
203 x_Throw(ret,
"error preparing statement for \"" +
sql +
"\"");
222 return sqlite3_column_int(
m_Stmt, col);
235 if ( (ret = sqlite3_bind_blob(
m_Stmt, col_or_id,
237 x_Throw(ret,
"error binding blob");
245 if ( (ret = sqlite3_bind_text(
m_Stmt, col_or_id,
246 val.data(),
static_cast<int>(
val.size()),
NULL)) != SQLITE_OK) {
247 x_Throw(ret,
"error binding string");
255 if ( (ret = sqlite3_bind_int(
m_Stmt, col_or_id,
val)) != SQLITE_OK) {
256 x_Throw(ret,
"error binding int");
265 switch ( (ret = sqlite3_step(
m_Stmt)) ) {
282 return sqlite3_step(
m_Stmt);
288 #if (SQLITE_VERSION_NUMBER > 3005001)
289 sqlite3_clear_bindings(
m_Stmt);
306 memcpy(m_Buf.data(),
buf,
size);
319 size_t* bytes_read = 0)
338 *
count = m_Buf.size() - m_Pos;
355 unique_ptr<IReader> reader;
356 int size = sqlite3_column_bytes(
stmt.GetStatement(), col);
357 const void*
data = sqlite3_column_blob(
stmt.GetStatement(), col);
367 return reader.release();
374 switch (GetErrCode())
377 case eInitError:
return "eInitError";
378 case eNotImplemented:
return "eNotImplemented";
386 : m_Timeout(7 * (24*60*60))
387 , m_TimeStampFlag(kDefaultTimestampPolicy)
388 , m_VersionFlag(eKeepAll)
424 while ( (
stmt = sqlite3_next_stmt(
m_DB, 0))!=0 ){
425 sqlite3_finalize(
stmt);
430 <<
count <<
" pending statements");
432 _TRACE(
"CSQLITE3_Cache::~CSQLITE3_Cache(): no pending statements");
437 int ret = sqlite3_close(
m_DB);
439 if (ret != SQLITE_OK) {
442 "error closing database '" <<
m_Database <<
"'");
451 LOG_POST(
Info <<
"CSQLITE3_Cache::~CSQLITE3_Cache(): read "
452 << items <<
" items / "
453 << bytes <<
" bytes / "
454 << msec <<
" msec / "
455 << bytes /
double(items) <<
" bytes/item / "
456 << msec /
double(items) <<
" msec/item / "
466 if (!
stmt.Execute() ) {
510 #if (SQLITE_VERSION_NUMBER > 3005001)
512 SQLITE_OPEN_READWRITE | SQLITE_OPEN_CREATE,
517 if (ret != SQLITE_OK) {
518 string msg(
"error opening database '");
527 catch (std::exception& e)
529 LOG_POST(
Error <<
"Attempt to delete damaged DB failed: " << e.what());
542 "failed to set journaling mode");
547 "failed to set temp store");
552 "failed to set page_size");
559 "failed to set synchronous mode");
563 "failed to disable count chnages mode");
576 "CREATE TABLE CacheBlobs " \
578 " key varchar(256) NOT NULL, " \
579 " version int NOT NULL, " \
580 " subkey varchar(256) NOT NULL, " \
582 " timestamp int NOT NULL, " \
585 if ( !
stmt.Execute() ) {
587 "failed to initialize cache");
596 "CREATE UNIQUE INDEX CacheBlobs_pk ON CacheBlobs(key, version, subkey)");
597 if ( !
stmt.Execute() ) {
599 "failed to initialize cache: failed to create PK index");
606 "CREATE INDEX CacheBlobs_timestamp ON CacheBlobs(timestamp)");
607 if ( !
stmt.Execute() ) {
609 "failed to initialize cache: failed to create timestamp index");
638 unsigned int timeout,
639 unsigned int max_timeout)
690 "UPDATE CacheBlobs SET timestamp = ?1 WHERE "
691 "key = ?2 AND version = ?3 AND subkey = ?4";
704 LOG_POST(
Error <<
"failed to update timestamp on cache blob: "
721 unsigned int time_to_live,
730 req->buffer.resize(
size);
731 memcpy(req->buffer.data(),
data,
size);
769 "INSERT OR REPLACE INTO CacheBlobs (key, version, subkey, timestamp, data) "
770 "VALUES( ?1, ?2, ?3, ?4, ?5 )"));
783 <<
sql <<
": [" << ret <<
"] " << sqlite3_errmsg(
m_DB));
794 string sql =
"SELECT data FROM CacheBlobs WHERE ";
797 if (
stmt.Step() == SQLITE_ROW) {
798 return sqlite3_column_bytes(
stmt.GetStatement(), 0);
813 string sql =
"SELECT data FROM CacheBlobs WHERE ";
817 if (
stmt.Step() == SQLITE_ROW) {
818 size_t size = sqlite3_column_bytes(
stmt.GetStatement(), 0);
821 sqlite3_column_blob(
stmt.GetStatement(), 0),
848 "SELECT data FROM CacheBlobs WHERE "
849 "key = ?1 AND version = ?2 AND subkey = ?3"
867 return reader.release();
882 "CSQLITE3_Cache::GetReadStream(key, subkey, version, validity) "
883 "is not implemented");
893 "CSQLITE3_Cache::SetBlobVersionAsValid(key, subkey, version) "
894 "is not implemented");
901 unsigned int time_to_live,
930 size_t* bytes_written = 0)
933 m_Data.insert(m_Data.end(),
934 (
const unsigned char*)(
buf),
935 (
const unsigned char*)(
buf) +
count);
937 *bytes_written =
count;
948 m_Cache.Store(m_Key, m_Version, m_Subkey,
949 &m_Data[0], m_Data.size());
961 vector<unsigned char> m_Data;
974 string sql =
"DELETE FROM CacheBlobs WHERE key = '";
989 string sql =
"DELETE FROM CacheBlobs WHERE ";
1002 string sql =
"SELECT timestamp FROM CacheBlobs WHERE ";
1005 if (
stmt.Step() == SQLITE_ROW) {
1006 return stmt.GetInt(0);
1017 if (access_timeout == 0) {
1023 time_t curr = time_stamp.
GetTimeT();
1027 string sql =
"DELETE FROM CacheBlobs WHERE timestamp < ?1";
1029 stmt.Bind(1,
int(curr));
1030 if (
stmt.Step() == SQLITE_DONE) {
1033 <<
count <<
" items purged");
1040 time_t access_timeout)
1044 if (access_timeout == 0) {
1050 time_t curr = time_stamp.
GetTimeT();
1055 "DELETE FROM CacheBlobs WHERE "
1059 sql +=
" AND key = '";
1065 sql +=
" AND subkey = '";
1070 stmt.Bind(1,
int(curr));
1071 if (
stmt.Step() == SQLITE_DONE) {
1074 <<
count <<
" items purged");
1090 if ( !driver_params ) {
1099 string base1, base2;
1119 "SELECT timestamp FROM CacheBlobs WHERE "
1126 "SELECT timestamp FROM CacheBlobs WHERE "
1127 "key = ?1 AND subkey = ?2"));
1141 return (
stmt->Step() == SQLITE_ROW);
1157 blob_descr->
reader.reset();
1167 int this_timestamp = 0;
1172 "SELECT timestamp, data FROM CacheBlobs WHERE "
1173 "key = ?1 AND version = ?2 AND subkey = ?3";
1186 if (this_timestamp < timeout) {
1194 if (blob_descr->
buf &&
1196 memcpy(blob_descr->
buf,
CNcbiOstrstreamToString class helps convert CNcbiOstrstream to a string Sample usage:
thread for delayed writes
SQLITE3 cache implementation.
SQLITE3 ICache exception.
sqlite3_stmt * GetStatement()
void x_Log(int ret, const string &msg)
void Bind(int col_or_id, const void *data, size_t size)
CSQLITE3_Statement & operator>>(int &i)
int GetInt(int col)
return / extract an integer value for a column
CSQLITE3_Statement(sqlite3 *db, const string &sql)
compile a statement
CSQLITE3_Statement(const CSQLITE3_Statement &)
void x_Throw(int ret, const string &msg)
CSQLITE3_Statement & operator=(const CSQLITE3_Statement &)
Reallocable memory buffer (no memory copy overhead) Mimics vector<>, without the overhead of explicit...
void Push(const TValue &elem, const CTimeSpan *timeout=NULL)
Add new element to the end of queue.
TValue Pop(const CTimeSpan *timeout=NULL)
Retrieve an element from the queue.
definition of a Culling tree
EKeepVersions
If to keep already cached versions of the BLOB when storing another version of it (not necessarily a ...
@ eDropOlder
Delete the earlier (than the one being stored) versions of the BLOB.
@ eDropAll
Delete all versions of the BLOB, even those which are newer than the one being stored.
EBlobVersionValidity
BLOB version existence and validity – from the point of view of the underlying cache implementation.
@ fTimeStampOnRead
Timestamp is updated every on every access (read or write)
int TTimeStampFlags
Holds a bitwise OR of "ETimeStampFlags".
A very basic data-read interface.
A very basic data-write interface.
The NCBI C++ standard methods for dealing with std::string.
static void DLIST_NAME() remove(DLIST_LIST_TYPE *list, DLIST_TYPE *item)
CNcbiIstream & operator>>(CNcbiIstream &s, const getcontig &c)
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
TValue Get(void) const THROWS_NONE
Get atomic counter value.
CRef< CWriterThread > m_WriterThread
void Stop()
Queue a request to stop the background writer Asyncronous! Thread may not stop yet when it gets back ...
virtual time_t GetAccessTime(const string &key, int version, const string &subkey)
Return last access time for the specified cache entry.
virtual ~CSQLITE3_Cache()
virtual void GetBlobAccess(const string &key, int version, const string &subkey, SBlobAccessDescr *blob_descr)
Get BLOB access using BlobAccessDescr.
virtual void GetBlobOwner(const string &key, int version, const string &subkey, string *owner)
Retrieve BLOB owner.
unique_ptr< CSQLITE3_Statement > m_Stmt_Store
precompiled statements these are used to speed up time-critical accesses
virtual void SetTimeStampPolicy(TTimeStampFlags policy, unsigned int timeout, unsigned int max_timeout=0)
Set timestamp update policy.
virtual IReader * GetReadStream(const string &key, int version, const string &subkey)
Return sequential stream interface to read BLOB data.
bool Vacuum()
Vacuum the database (should be open first)
void * Main()
Derived (user-created) class must provide a real thread function.
unique_ptr< CSQLITE3_Statement > m_Stmt_GetReadStream
virtual bool Read(const string &key, int version, const string &subkey, void *buf, size_t buf_size)
Fetch the BLOB.
void x_SetTimestamp(const string &key, int version, const string &subkey)
virtual TTimeStampFlags GetTimeStampPolicy() const
Get timestamp policy.
virtual EKeepVersions GetVersionRetention() const
Get version retention.
void SetMemBufferSize(unsigned int buf_size)
Set size of the intermidiate BLOB memory buffer.
virtual void Remove(const string &key)
virtual void Store(const string &key, int version, const string &subkey, const void *data, size_t size, unsigned int time_to_live=0, const string &owner=kEmptyStr)
Add or replace BLOB.
virtual void SetVersionRetention(EKeepVersions policy)
Set version retention policy.
string m_Database
filename of the database
void StoreSynchronous(const string &key, int version, const string &subkey, const void *data, size_t size)
TTimeStampFlags m_TimeStampFlag
virtual size_t GetSize(const string &key, int version, const string &subkey)
Check if BLOB exists, return BLOB size.
unique_ptr< CSQLITE3_Statement > m_Stmt_GetBlobAccess
virtual int GetTimeout() const
Get expiration timeout.
virtual void SetBlobVersionAsValid(const string &key, const string &subkey, int version)
virtual bool SameCacheParams(const TCacheParams *params) const
virtual IWriter * GetWriteStream(const string &key, int version, const string &subkey, unsigned int time_to_live=0, const string &owner=kEmptyStr)
Specifics of this IWriter implementation is that IWriter::Flush here cannot be called twice,...
const string kSQLITE3_BlobCacheDriverName
unique_ptr< CSQLITE3_Statement > m_Stmt_HasBlobs_key
unique_ptr< CSQLITE3_Statement > m_Stmt_HasBlobs_key_subkey
unique_ptr< CSQLITE3_Statement > m_Stmt_SetTimestamp
virtual void Purge(time_t access_timeout)
Delete all BLOBs older than specified.
virtual string GetCacheName(void) const
CWriterThread(CSQLITE3_Cache &cache, TWriteQueue &write_q)
virtual bool IsOpen() const
virtual bool HasBlobs(const string &key, const string &subkey)
Check if any BLOB exists (any version)
EKeepVersions m_VersionFlag
virtual const char * GetErrCodeString(void) const override
Get error code interpreted as text.
void Open(const string &database, bool remove=false)
Open cache.
@ kDefaultTimestampPolicy
#define NCBI_CURRENT_FUNCTION
Get current function name.
#define LOG_POST(message)
This macro is deprecated and it's strongly recomended to move in all projects (except tests) to macro...
void Error(CExceptionArgs_Base &args)
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
void Warning(CExceptionArgs_Base &args)
virtual const char * GetErrCodeString(void) const
Get error code interpreted as text.
void Info(CExceptionArgs_Base &args)
virtual bool Remove(TRemoveFlags flags=eRecursive) const
Remove a directory entry.
virtual bool Exists(void) const
Check if directory "dirname" exists.
bool Create(TCreateFlags flags=fCreate_Default) const
Create the directory using "dirname" passed in the constructor.
static void SplitPath(const string &path, string *dir=0, string *base=0, string *ext=0)
Split a path string into its basic components.
void Read(CObjectIStream &in, TObjectPtr object, const CTypeRef &type)
void Write(CObjectOStream &out, TConstObjectPtr object, const CTypeRef &type)
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
ERW_Result
Result codes for I/O operations.
@ eRW_Eof
End of data, should be considered permanent.
@ eRW_Error
Unrecoverable error, no retry possible.
@ eRW_Success
Everything is okay, I/O completed.
static string IntToString(int value, TNumToStringFlags flags=0, int base=10)
Convert int to string.
double Elapsed(void) const
Return time elapsed since first Start() or last Restart() call (in seconds).
time_t GetTimeT(void) const
Get time in time_t format.
void Start(void)
Start the timer.
@ eCurrent
Use current time. See also CCurrentTime.
const TTreeType * FindNode(const TKeyType &key, TNodeSearchMode sflag=eImmediateAndTop) const
Search for node.
const TValue & GetValue(void) const
Return node's value.
use only n Cassandra database for the lookups</td > n</tr > n< tr > n< td > yes</td > n< td > do not use tables BIOSEQ_INFO and BLOB_PROP in the Cassandra database
const string version
version string
string Execute(const string &cmmd, const vector< string > &args, const string &data=kEmptyStr)
const struct ncbi::grid::netcache::search::fields::SIZE size
const struct ncbi::grid::netcache::search::fields::KEY key
const struct ncbi::grid::netcache::search::fields::SUBKEY subkey
Defines classes: CDirEntry, CFile, CDir, CSymLink, CMemoryFile, CFileUtil, CFileLock,...
Multi-threading – mutexes; rw-locks; semaphore.
Defines: CTimeFormat - storage class for time format.
static SLJIT_INLINE sljit_ins msg(sljit_gpr r, sljit_s32 d, sljit_gpr x, sljit_gpr b)
static IReader * GetBlobReader(CSQLITE3_Statement &stmt, int col)
static void s_MakeKeyCondition(const string &key, int version, const string &subkey, string *out_str)
Add BLOB key specific where condition.
static SStats s_CacheStats
delayed write request object
unique_ptr< IReader > reader
CAtomicCounter total_time
CAtomicCounter objects_read
CAtomicCounter bytes_read