NCBI C++ ToolKit
periodic_sync.hpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 #ifndef NETCACHE__PERIODIC_SYNC__HPP
2 #define NETCACHE__PERIODIC_SYNC__HPP
3 
4 /* $Id: periodic_sync.hpp 78894 2017-07-27 14:39:32Z gouriano $
5  * ===========================================================================
6  *
7  * PUBLIC DOMAIN NOTICE
8  * National Center for Biotechnology Information
9  *
10  * This software/database is a "United States Government Work" under the
11  * terms of the United States Copyright Act. It was written as part of
12  * the author's official duties as a United States Government employee and
13  * thus cannot be copyrighted. This software/database is freely available
14  * to the public for use. The National Library of Medicine and the U.S.
15  * Government have not placed any restriction on its use or reproduction.
16  *
17  * Although all reasonable efforts have been taken to ensure the accuracy
18  * and reliability of the software and data, the NLM and the U.S.
19  * Government do not and cannot warrant the performance or results that
20  * may be obtained by using this software or data. The NLM and the U.S.
21  * Government disclaim all warranties, express or implied, including
22  * warranties of performance, merchantability or fitness for any particular
23  * purpose.
24  *
25  * Please cite the author in any work or product based on this material.
26  *
27  * ===========================================================================
28  *
29  * Authors: Pavel Ivanov
30  *
31  * File Description: Data structures and API to support blobs mirroring.
32  *
33  */
34 
35 #include "sync_log.hpp"
36 #include "nc_db_info.hpp"
37 #include "nc_utils.hpp"
38 #include <set>
39 
40 
42 
43 
44 class CNCPeerControl;
45 class CNCActiveHandler;
46 class SNCCacheData;
47 
48 
51  eServerBusy, //< Clean is in progress
53  eProceedWithEvents, //< The sync can proceed basing on events log
54  eProceedWithBlobs, //< The sync can proceed basing on full list of blobs
55  eUnknownServer //< command came from unknown server
56 };
57 
58 
59 // Front end for periodic synchronization
60 // The interface is for sync initiated by another peer only!
61 // The initiative from the current peer is coming from
62 // CNCActiveSyncControl.
64 {
65 public:
66  static bool Initialize(void);
67  static void ReConfig(void);
68  static void ReInitialize(void);
69  static void Finalize(void);
70 
71  // Initiates synchronization which came from a peer netcache.
72  // It can be rejected if another sync is in progress or there are no
73  // records available anymore
74  static ESyncInitiateResult Initiate(Uint8 server_id,
75  Uint2 slot,
76  Uint8* local_start_rec_no,
77  Uint8* remote_start_rec_no,
78  TReducedSyncEvents* events,
79  Uint8* sync_id);
80 
81  // Cancels the current sync
82  static void Cancel(Uint8 server_id, Uint2 slot, Uint8 sync_id);
83 
85  Uint2 slot,
86  bool can_abort,
87  Uint8& sync_id);
88  static void MarkCurSyncByBlobs(Uint8 server_id, Uint2 slot, Uint8 sync_id);
89  static void SyncCommandFinished(Uint8 server_id, Uint2 slot, Uint8 sync_id);
90 
91  // Completes the current sync transaction
92  static void Commit(Uint8 server_id,
93  Uint2 slot,
94  Uint8 sync_id,
95  Uint8 local_synced_rec_no,
96  Uint8 remote_synced_rec_no);
97 };
98 
99 
100 
102 {
103  eIdle, // Vacant for something
104  eSyncInProgress, // Periodic sync is in progress (could be events based or blob lists based)
105  eCleanInProgress // Events log cleaning is in progress
106 };
107 
114 };
115 
125 };
126 
133 };
134 
135 #define NC_SYNC_HINT __LINE__
136 
138 {
154  int hint;
155 
157 };
158 
159 typedef vector<SSyncSlotSrv*> TSlotSrvsList;
160 
161 
163 {
168  bool cleaning;
170 
172 };
173 
175 typedef vector<SSyncSlotData*> TSyncSlotsList;
176 
177 
178 typedef TSyncEvents::const_iterator TSyncEventsIt;
180 
182 {
188 };
189 
190 
191 /*
192  manages sychronization (blob data and metadata) between NC servers
193 
194  begin: x_StartScanSlots
195 
196  -> x_StartScanSlots: pick first slot, goto x_CheckSlotOurSync
197 
198  -> x_CheckSlotOurSync
199  when all slots processed, goto x_FinishScanSlots
200  for a given slot, check if it is time to sync; if yes, goto x_DoPeriodicSync
201  goto x_CheckSlotTheirSync
202 
203  -> x_CheckSlotTheirSync
204  check if there is a sync on this slot started by somebody else and was not active for too long
205  if yes, cancel it
206  pick next slot, goto x_CheckSlotOurSync
207 
208  -> x_FinishScanSlots
209  calc when to run next time
210  goto x_StartScanSlots
211 
212  -> x_DoPeriodicSync:
213  once again, check that the sync does not start from both sides at the same time
214  if so, goto x_CheckSlotTheirSyn
215  get CNCActiveHandler which sends start sync (execute SyncStart command)
216  ok ? x_WaitSyncStarted : x_FinishSync
217 
218  -> x_WaitSyncStarted
219  wait for sync started (check m_StartedCmds)
220  NCActiveHandler will report command result using CmdFinished() method
221  depending on the reply, goto x_PrepareSyncByBlobs, or goto x_PrepareSyncByEvents
222 
223  -> x_PrepareSyncByEvents
224  another server has sent us list of events,
225  now give this list to CNCSyncLog, which will gve us the difference (m_Events2Get,m_Events2Send)
226  if list not empty, goto x_ExecuteSyncCommands
227 
228  if CNCSyncLog cannot sync event lists (eg, some our info is lost), request blob list
229  goto x_WaitForBlobList
230 
231  -> x_WaitForBlobList
232  once blob list received, goto x_PrepareSyncByBlobs
233 
234  -> x_PrepareSyncByBlobs
235  re-fill list of local blobs (in given slot)
236  goto x_ExecuteSyncCommands
237 
238  -> x_ExecuteSyncCommands
239  if no more commands, goto x_ExecuteFinalize
240  if network error, goto x_FinishSync
241  goto x_WaitForExecutingTasks
242 
243  -> x_ExecuteFinalize
244  on success, finish sync, goto x_WaitForExecutingTasks
245  on error, goto x_FinishSync
246 
247  -> x_WaitForExecutingTasks
248  woken up by CmdFinished
249  if all commands are executed ok, and need 'commit', goto x_ExecuteFinalize
250  after commit is done, goto x_FinishSync
251 
252  -> x_FinishSync
253  log report
254  CommitSync
255  goto x_CheckSlotTheirSync
256 
257 */
258 
259 class CNCActiveSyncControl : public CSrvStatesTask<CNCActiveSyncControl>
260 {
261 public:
262  CNCActiveSyncControl(void);
263  virtual ~CNCActiveSyncControl(void);
264 
265  Uint2 GetSyncSlot(void);
266  void StartResponse(Uint8 local_rec_no, Uint8 remote_rec_no, bool by_blobs);
267  bool AddStartEvent(SNCSyncEvent* evt);
268  bool AddStartBlob(const string& key, SNCBlobSummary* blob_sum);
269  bool GetNextTask(SSyncTaskInfo& task_info, bool* is_valid = nullptr);
270  void ExecuteSyncTask(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
271  void CmdFinished(ESyncResult res, ESynActionType action, CNCActiveHandler* conn, int hint);
272  bool IsStuck(void) const {
273  return m_Stuck;
274  }
275  void SetFirst(bool f = true) {
276  m_First = f;
277  }
278  static void PrintState(TNCBufferType& sendBuff, const CTempString& mask);
279 
280 private:
281  State x_StartScanSlots(void);
282  State x_CheckSlotOurSync(void);
283  State x_CheckSlotTheirSync(void);
284  State x_FinishScanSlots(void);
285  State x_DoPeriodicSync(void);
286  State x_WaitSyncStarted(void);
287  State x_PrepareSyncByEvents(void);
288  State x_WaitForBlobList(void);
289  State x_PrepareSyncByBlobs(void);
290  State x_ExecuteSyncCommands(void);
291  State x_ExecuteFinalize(void);
292  State x_WaitForExecutingTasks(void);
293  State x_FinishSync(void);
294  void x_CleanSyncObjects(void);
295  void x_CalcNextTask(void);
296  void x_DoEventSend(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
297  void x_DoEventGet(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
298  void x_DoBlobUpdateOur(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
299  void x_DoBlobUpdatePeer(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
300  void x_DoBlobSend(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
301  void x_DoBlobGet(const SSyncTaskInfo& task_info, CNCActiveHandler* conn);
303 
304 
311  int m_Hint;
315 
339  bool m_DidSync;
341  bool m_Stuck;
342  bool m_First;
346  TSyncSlotsList::const_iterator m_NextSlotIt;
351 };
352 
353 
354 class CNCLogCleaner : public CSrvTask
355 {
356 public:
357  CNCLogCleaner(void);
358  virtual ~CNCLogCleaner(void);
359 
360 private:
361  virtual void ExecuteSlice(TSrvThreadNum thr_idx);
362 
363 
364  TSyncSlotsList::const_iterator m_NextSlotIt;
366 };
367 
368 
369 
370 
371 //////////////////////////////////////////////////////////////////////////
372 // Inline functions
373 //////////////////////////////////////////////////////////////////////////
374 
375 inline Uint2
377 {
378  return m_Slot;
379 }
380 
381 inline void
383  Uint8 remote_rec_no,
384  bool by_blobs)
385 {
386  m_LocalStartRecNo = local_rec_no;
387  m_RemoteStartRecNo = remote_rec_no;
388  m_SlotSrv->is_by_blobs = by_blobs;
389 }
390 
391 inline bool
393 {
394  if (m_Result != eSynOK) {
395  return false;
396  }
397  if (evt->event_type == eSyncProlong)
398  m_RemoteEvents[evt->key.PackedKey()].prolong_event = evt;
399  else
400  m_RemoteEvents[evt->key.PackedKey()].wr_or_rm_event = evt;
401  return true;
402 }
403 
404 inline bool
406 {
407  if (m_Result != eSynOK) {
408  return false;
409  }
410  m_RemoteBlobs[key] = blob_sum;
411  return true;
412 }
413 
415 
416 
417 #endif /* NETCACHE__PERIODIC_SYNC__HPP */
418 
ncbi::TMaskedQueryRegions mask
Mutex created to have minimum possible size (its size is 4 bytes) and to sleep using kernel capabilit...
Definition: srv_sync.hpp:193
State x_WaitSyncStarted(void)
void x_DoEventSend(const SSyncTaskInfo &task_info, CNCActiveHandler *conn)
void x_DoBlobSend(const SSyncTaskInfo &task_info, CNCActiveHandler *conn)
TSyncSlotsList::const_iterator m_NextSlotIt
void x_DoBlobGet(const SSyncTaskInfo &task_info, CNCActiveHandler *conn)
State x_ExecuteSyncCommands(void)
TSyncEventsIt m_CurGetEvent
static void PrintState(TNCBufferType &sendBuff, const CTempString &mask)
State x_WaitForExecutingTasks(void)
State x_PrepareSyncByEvents(void)
void x_DoFinalize(CNCActiveHandler *conn)
void x_DoEventGet(const SSyncTaskInfo &task_info, CNCActiveHandler *conn)
State x_CheckSlotOurSync(void)
bool GetNextTask(SSyncTaskInfo &task_info, bool *is_valid=nullptr)
TReducedSyncEvents m_RemoteEvents
bool AddStartEvent(SNCSyncEvent *evt)
bool AddStartBlob(const string &key, SNCBlobSummary *blob_sum)
State x_PrepareSyncByBlobs(void)
virtual ~CNCActiveSyncControl(void)
void SetFirst(bool f=true)
void CmdFinished(ESyncResult res, ESynActionType action, CNCActiveHandler *conn, int hint)
void StartResponse(Uint8 local_rec_no, Uint8 remote_rec_no, bool by_blobs)
void ExecuteSyncTask(const SSyncTaskInfo &task_info, CNCActiveHandler *conn)
State x_CheckSlotTheirSync(void)
State x_ExecuteFinalize(void)
void x_DoBlobUpdatePeer(const SSyncTaskInfo &task_info, CNCActiveHandler *conn)
SSyncSlotSrv * m_SlotSrv
State x_FinishScanSlots(void)
State x_StartScanSlots(void)
State x_DoPeriodicSync(void)
TBlobsListIt m_CurLocalBlob
bool IsStuck(void) const
SSyncSlotData * m_SlotData
TNCBlobSumList m_LocalBlobs
set< SSyncSlotSrv * > m_VisitedSrv
set< CNCActiveHandler * > m_SyncHandlers
void x_DoBlobUpdateOur(const SSyncTaskInfo &task_info, CNCActiveHandler *conn)
TBlobsListIt m_CurRemoteBlob
TNCBlobSumList m_RemoteBlobs
State x_WaitForBlobList(void)
TSyncEventsIt m_CurSendEvent
const string & PackedKey(void) const
Definition: netcached.hpp:180
map< Uint2, Uint8 > m_LastForceTime
virtual ~CNCLogCleaner(void)
virtual void ExecuteSlice(TSrvThreadNum thr_idx)
This is the main method to do all work this task should do.
TSyncSlotsList::const_iterator m_NextSlotIt
static void ReInitialize(void)
static ESyncInitiateResult Initiate(Uint8 server_id, Uint2 slot, Uint8 *local_start_rec_no, Uint8 *remote_start_rec_no, TReducedSyncEvents *events, Uint8 *sync_id)
static void MarkCurSyncByBlobs(Uint8 server_id, Uint2 slot, Uint8 sync_id)
static void Finalize(void)
static void Commit(Uint8 server_id, Uint2 slot, Uint8 sync_id, Uint8 local_synced_rec_no, Uint8 remote_synced_rec_no)
static bool Initialize(void)
static void Cancel(Uint8 server_id, Uint2 slot, Uint8 sync_id)
static void ReConfig(void)
static ESyncInitiateResult CanStartSyncCommand(Uint8 server_id, Uint2 slot, bool can_abort, Uint8 &sync_id)
static void SyncCommandFinished(Uint8 server_id, Uint2 slot, Uint8 sync_id)
Special task which executes as finite state machine.
Definition: srv_tasks.hpp:283
Main working entity in TaskServer.
Definition: srv_tasks.hpp:88
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:65
static bool is_valid(const char *num, int type, CONV_RESULT *cr)
static CS_CONNECTION * conn
Definition: ct_dynamic.c:25
uint8_t Uint1
1-byte (8-bit) unsigned integer
Definition: ncbitype.h:99
uint32_t Uint4
4-byte (32-bit) unsigned integer
Definition: ncbitype.h:103
uint16_t Uint2
2-byte (16-bit) unsigned integer
Definition: ncbitype.h:101
uint64_t Uint8
8-byte (64-bit) unsigned integer
Definition: ncbitype.h:105
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
const struct ncbi::grid::netcache::search::fields::KEY key
double f(double x_, const double &y_)
Definition: njn_root.hpp:188
ESyncStatus
@ eSyncInProgress
@ eCleanInProgress
@ eIdle
vector< SSyncSlotSrv * > TSlotSrvsList
ESynActionType
@ eSynActionNone
@ eSynActionProlong
@ eSynActionRead
@ eSynActionRemove
@ eSynActionWrite
ESyncInitiateResult
@ eProceedWithEvents
@ eServerBusy
@ eCrossSynced
@ eNetworkError
@ eUnknownServer
@ eProceedWithBlobs
TNCBlobSumList::const_iterator TBlobsListIt
ESynTaskType
@ eSynNoTask
@ eSynNeedFinalize
@ eSynBlobGet
@ eSynEventSend
@ eSynBlobUpdatePeer
@ eSynEventGet
@ eSynBlobSend
@ eSynBlobUpdateOur
vector< SSyncSlotData * > TSyncSlotsList
TSyncEvents::const_iterator TSyncEventsIt
map< Uint2, SSyncSlotData * > TSyncSlotsMap
ESyncResult
@ eSynServerBusy
@ eSynNetworkError
@ eSynOK
@ eSynCrossSynced
@ eSynAborted
Single event record.
Definition: sync_log.hpp:52
CNCBlobKeyLight key
Definition: sync_log.hpp:55
ENCSyncEvent event_type
Definition: sync_log.hpp:56
TSlotSrvsList srvs
CMiniMutex lock
SSyncSlotData(Uint2 slot)
Uint8 last_success_time
ESyncResult result
Uint8 last_active_time
CMiniMutex lock
CNCPeerControl * peer
SSyncSlotSrv(CNCPeerControl *peer)
TSyncEventsIt get_evt
TBlobsListIt local_blob
TBlobsListIt remote_blob
ESynTaskType task_type
TSyncEventsIt send_evt
@ eSyncProlong
Definition: sync_log.hpp:45
list< SNCSyncEvent * > TSyncEvents
Definition: sync_log.hpp:91
Uint2 TSrvThreadNum
Type for thread number in TaskServer.
Definition: task_server.hpp:42
Modified on Wed Apr 17 13:08:18 2024 by modify_doxy.py rev. 669887