NCBI C++ ToolKit
sync_log.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: sync_log.cpp 73070 2016-06-17 13:05:41Z gouriano $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Denis Vakatov, Pavel Ivanov, Sergey Satskiy
27  *
28  * File Description: API to support blobs synchronization
29  *
30  */
31 
32 #include "nc_pch.hpp"
33 
34 #include <corelib/ncbidbg.hpp>
35 
36 #include "netcached.hpp"
37 #include "sync_log.hpp"
38 #include "distribution_conf.hpp"
39 #include "task_server.hpp"
40 
41 
42 
44 
45 
46 struct SSlotData
47 {
51 
52  SSlotData(void) : rec_number(0)
53  {}
54 
55  SSlotData(const SSlotData& other) : rec_number(0)
56  {
57  if (other.rec_number != 0 || !other.events.empty()) {
58  SRV_FATAL("Invalid state");
59  }
60  }
61 };
62 
63 
65 {
68 
70  {}
71 };
72 
73 
76 static TLog s_Log;
82 
83 
84 // File IO supporting structures
85 static const size_t kMaxKeyLength = 1024;
86 
87 struct SFixedPart
88 {
89  Uint8 rec_no; //< Local event sequential number.
90  ENCSyncEvent event_type; //< Event type (write, remove, prolong).
91  Uint2 slot; //< Key slot number.
92  Uint8 orig_time; //< Timestamp of the event when
93  //< it originated by client.
94  Uint8 orig_server; //< The server where event has
95  //< been originated.
96  Uint8 orig_rec_no; //< Record number on the host where the
97  //< event was originated.
98  Uint8 local_time; //< Timestamp when the record was
99  //< recorded locally.
100 };
101 
103 {
108 };
109 
110 
111 // Reads the beginning of the file where the information about last synced
112 // records per pair server <--> slot is saved.
113 // Returns true if this information is read successfully.
114 static bool
116 {
117  size_t count = 0;
118 
119  // Read the number of pairs server <--> slot
120  if (fread(&count, sizeof(count), 1, file) != 1) {
121  SRV_LOG(Critical, "Cannot read the number of saved pairs(server <--> slot) "
122  "with the last synced record numbers. Invalid file?");
123  return false;
124  }
125 
126  for (; count > 0; --count) {
127  SFileServRecord record;
128 
129  if (fread(&record, sizeof(record), 1, file) != 1) {
130  SRV_LOG(Critical, "Cannot read last synced record numbers. Invalid file?");
131  return false;
132  }
133  SSrvSyncedData& sync_data = s_SyncedData[record.key_server]
134  [record.key_slot];
135  sync_data.local_rec_no = record.local_rec_no;
136  sync_data.remote_rec_no = record.remote_rec_no;
137  }
138 
141  bool valid = false;
142  if (peers.find(it_srv->first) != peers.end()) {
143  const vector<Uint2>& slots = CNCDistributionConf::GetCommonSlots(it_srv->first);
144  if (!slots.empty()) {
145  valid = true;
146  TSrvSyncedMap& srv_map = it_srv->second;
147  ERASE_ITERATE(TSrvSyncedMap, it_slot, srv_map) {
148  if (find(slots.begin(), slots.end(), it_slot->first)
149  == slots.end())
150  {
151  srv_map.erase(it_slot);
152  }
153  }
154  }
155  }
156  if (!valid)
157  s_SyncedData.erase(it_srv);
158  }
159 
160  return true;
161 }
162 
163 
164 // Reads and saves a single saved event record.
165 // Returns true if the event was read successfully.
166 static bool
168 {
169  // Read the fixed part
170  SFixedPart record;
171  if (fread(&record, sizeof(record), 1, file) != 1)
172  return false;
173 
174  // Read the key size
175  size_t key_size;
176  if (fread(&key_size, sizeof(key_size), 1, file) != 1)
177  return false;
178 
179  // Read the key
180  char key[kMaxKeyLength];
181  if (fread(key, key_size, 1, file) != 1)
182  return false;
183 
184  // Insert the corresponding record
185  SNCSyncEvent* new_record = new SNCSyncEvent;
186 
187  new_record->rec_no = record.rec_no;
188  new_record->event_type = record.event_type;
189  new_record->orig_time = record.orig_time;
190  new_record->orig_server = record.orig_server;
191  new_record->orig_rec_no = record.orig_rec_no;
192  new_record->local_time = record.local_time;
193  new_record->key = CTempString(key, key_size);
194 
195  s_Log[record.slot].events.push_back(new_record);
196  return true;
197 }
198 
199 
200 // Writes the beginning of the file where the information about last synced
201 // records per pair server <--> slot is saved.
202 // Returns true if this information is written successfully.
203 static bool
205 {
206  size_t count = 0;
208  count += it_srv->second.size();
209  }
210 
211  if (fwrite(&count, sizeof(count), 1, file) != 1)
212  return false;
213 
215  ITERATE(TSrvSyncedMap, it_slot, it_srv->second) {
216  SFileServRecord record;
217  record.key_server = it_srv->first;
218  record.key_slot = it_slot->first;
219  record.local_rec_no = it_slot->second.local_rec_no;
220  record.remote_rec_no = it_slot->second.remote_rec_no;
221 
222  if (fwrite(&record, sizeof(record), 1, file) != 1)
223  return false;
224  }
225  }
226  return true;
227 }
228 
229 
230 // Writes a single event record.
231 // Returns true if the event is written successfully.
232 static bool
233 s_WriteRecord(FILE* file, Uint2 slot, const SNCSyncEvent* event)
234 {
235  SFixedPart record;
236  memset(&record, 0, sizeof(record));
237  record.rec_no = event->rec_no;
238  record.event_type = event->event_type;
239  record.slot = slot;
240  record.orig_time = event->orig_time;
241  record.orig_server = event->orig_server;
242  record.orig_rec_no = event->orig_rec_no;
243  record.local_time = event->local_time;
244 
245  // Write fixed size fields
246  if (fwrite(&record, sizeof(record), 1, file) != 1)
247  return false;
248 
249  // Write the key size
250  size_t key_size = event->key.PackedKey().size();
251  if (fwrite(&key_size, sizeof(key_size), 1, file) != 1)
252  return false;
253 
254  // Write the key
255  if (fwrite(event->key.PackedKey().data(), key_size, 1, file) != 1)
256  return false;
257 
258  return true;
259 }
260 
261 static inline SSlotData&
263 {
265  return s_Log[slot];
266 }
267 
268 
269 // Provides the minimum record number till which the synchronization is done
270 // with all the servers
271 static Uint8
273 {
275 
276  Uint8 min_rec_no = (data.events.empty()? s_LastWrittenRecord
277  : data.events.front()->rec_no);
280  SSrvSyncedData& sync_data = it_srv->second[slot];
281  if (sync_data.local_rec_no >= min_rec_no
282  && sync_data.local_rec_no < result)
283  {
284  result = sync_data.local_rec_no;
285  }
286  }
287  return result;
288 }
289 
290 
291 // The prolong event is found in the src interval
292 static void
293 s_ProcessProlong(const SBlobEvent& src_event,
294  const SBlobEvent& other_event,
295  TSyncEvents* diff)
296 {
297  if (other_event.prolong_event != NULL) {
298  if (other_event.prolong_event->orig_server == src_event.prolong_event->orig_server
299  && other_event.prolong_event->orig_rec_no == src_event.prolong_event->orig_rec_no)
300  {
301  return;
302  }
303 
304  if (other_event.prolong_event->isOlder(*src_event.prolong_event)
305  && ((src_event.wr_or_rm_event == NULL)
306  || (src_event.wr_or_rm_event != NULL
307  && other_event.wr_or_rm_event != NULL
308  && other_event.wr_or_rm_event->orig_server == src_event.wr_or_rm_event->orig_server
309  && other_event.wr_or_rm_event->orig_rec_no == src_event.wr_or_rm_event->orig_rec_no)
310  || (src_event.wr_or_rm_event != NULL
311  && other_event.wr_or_rm_event != NULL
312  && src_event.wr_or_rm_event->isOlder(*other_event.wr_or_rm_event))))
313  {
314  diff->push_back(src_event.prolong_event);
315  }
316  }
317  else if (other_event.wr_or_rm_event != NULL) {
318  if (other_event.wr_or_rm_event->event_type == eSyncWrite) {
319  if (other_event.wr_or_rm_event->isOlder(*src_event.prolong_event)
320  && ((src_event.wr_or_rm_event == NULL)
321  || (src_event.wr_or_rm_event != NULL
322  && other_event.wr_or_rm_event->orig_server == src_event.wr_or_rm_event->orig_server
323  && other_event.wr_or_rm_event->orig_rec_no == src_event.wr_or_rm_event->orig_rec_no)
324  || (src_event.wr_or_rm_event != NULL
325  && src_event.wr_or_rm_event->isOlder(*other_event.wr_or_rm_event))))
326  {
327  diff->push_back(src_event.prolong_event);
328  }
329  }
330  }
331 }
332 
333 static void
335  const SBlobEvent& other_event,
336  TSyncEvents* diff)
337 {
338  if (other_event.wr_or_rm_event != NULL) {
339  // If there is write or remove it does not matter if there was
340  // prolong or not
341  if ((other_event.wr_or_rm_event->orig_server != src_event->orig_server
342  || other_event.wr_or_rm_event->orig_rec_no != src_event->orig_rec_no)
343  && other_event.wr_or_rm_event->isOlder(*src_event))
344  {
345  // Take the most recent event
346  diff->push_back(src_event);
347  return;
348  }
349  }
350  else {
351  // This is lone prolong
352  diff->push_back(src_event);
353  }
354 }
355 
356 
357 static bool
359  TReducedSyncEvents::const_iterator& current_iterator,
360  const string& key)
361 {
362  // It is known that the container is sorted by keys, so
363  for (; current_iterator != container.end(); ++current_iterator) {
364  if (current_iterator->first == key)
365  return true;
366  if (current_iterator->first > key)
367  return false;
368  }
369  return false;
370 }
371 
372 
373 // Here is how the decision is made:
374 // | User RM | WR | Prolong <-- found in src interval
375 // ----------------------------------------------------------
376 // User RM | none | if WR later | none
377 // | | => take WR |
378 // WR | if RM later | if WRsrc later | if PR later
379 // | => take RM | => take WRsrc | => take PR
380 // Prolong | * | ** | if PRsrc later
381 // | | | => take PRsrc
382 // ^- found in other
383 //
384 // * - if it was a lonely prolong => take User RM
385 // if there was WRother => take User RM if it is later than
386 // WRother
387 // ** - if it was a lonely prolong => take WR
388 // if there was WRother => take WRsrc if it is later
389 static void
391  Uint8 start_rec_no,
392  Uint8 now,
393  const TReducedSyncEvents& other,
394  Uint8* synced_rec_no,
395  TSyncEvents* diff)
396 {
398  TReducedSyncEvents::const_iterator src_end = src.end();
399  Uint8 max_rec_no = 0;
401  TReducedSyncEvents::const_iterator other_event = other.begin();
402 
403  for (; k != src_end; ++k) {
404  Uint8 op_rec_no = k->second.getMaxRecNoWithinTimeLimit(time_limit);
405  if (op_rec_no <= start_rec_no)
406  continue;
407 
408  // Update the synced record #
409  if (op_rec_no > max_rec_no)
410  max_rec_no = op_rec_no;
411 
412  if (s_SpecialFind(other, other_event, k->first) == false) {
413  // No operations found with this blob - take the ops
414  if (k->second.wr_or_rm_event != NULL
415  && (k->second.wr_or_rm_event->rec_no > start_rec_no))
416  {
417  diff->push_back(k->second.wr_or_rm_event);
418  }
419  else if (k->second.prolong_event != NULL
420  && (k->second.prolong_event->local_time < time_limit))
421  {
422  diff->push_back(k->second.prolong_event);
423  }
424  continue;
425  }
426 
427  // Found on the other side, let's make the decision
428  if (k->second.wr_or_rm_event != NULL
429  && (k->second.wr_or_rm_event->rec_no > start_rec_no))
430  {
431  s_ProcessWrite(k->second.wr_or_rm_event, other_event->second, diff);
432  }
433  if (k->second.prolong_event != NULL
434  && (k->second.prolong_event->local_time < time_limit))
435  {
436  s_ProcessProlong(k->second, other_event->second, diff);
437  }
438  }
439 
440  if (max_rec_no != 0)
441  *synced_rec_no = max_rec_no;
442  else
443  *synced_rec_no = start_rec_no;
444 }
445 
446 
447 // This call is guaranteed to be done once in one thread only.
448 // So there is no need to bother about thread safety.
449 void
450 CNCSyncLog::Initialize(bool need_read_saved, Uint8 start_log_rec_no)
451 {
452  s_TotalRecords.Set(0);
454 
456  if (!need_read_saved || file_name.empty()) {
457  // No need to load the log from the file.
458  // Start from the given record number.
459  s_LastWrittenRecord = start_log_rec_no;
460  return;
461  }
462 
463  // Need to load the log from the file and identify the record number to
464  // start with.
465  FILE* log_file = fopen(file_name.c_str(), "r");
466 
467  if (!log_file) {
468  SRV_LOG(Warning, "Cannot open file: " << file_name);
469 
470  // Could not read the file and therefore could not identify the
471  // start record number. So use the given
472  s_LastWrittenRecord = start_log_rec_no;
473  return;
474  }
475 
476  // Read the server records
477  if (!s_ReadHeader(log_file)) {
478  // Error occurred, the file is broken
480  s_LastWrittenRecord = start_log_rec_no;
481  return;
482  }
483 
484  // Read the log records
485  Uint4 recs_count = 0;
486  while (s_ReadRecord(log_file))
487  ++recs_count;
488  s_TotalRecords.Set(recs_count);
489 
490  // Check for the errors
491  if (!feof(log_file)) {
492  SRV_LOG(Critical, "Cannot read records from " << file_name
493  << ". Invalid file?");
494 
495  // Error occurred, the file is broken:
496  // Reset the counter, clean the log container, set the start number.
497  s_LastWrittenRecord = start_log_rec_no;
498  s_TotalRecords.Set(0);
500  s_Log.clear();
501 
502  fclose(log_file);
503  return;
504  }
505  fclose(log_file);
506 
507  // Set the last written record number
508  NON_CONST_ITERATE(TLog, slot, s_Log) {
509  SSlotData& data = slot->second;
510  data.rec_number = data.events.size();
511  if (data.rec_number != 0) {
512  Uint8 last_rec_no = data.events.back()->rec_no;
513  if (last_rec_no > s_LastWrittenRecord)
514  s_LastWrittenRecord = last_rec_no;
515  }
516  }
517 }
518 
519 // This call is guaranteed to be done once in one thread only.
520 // So there is no need to bother about thread safety.
521 bool
523 {
525  if (file_name.empty()) {
526  return false;
527  }
528  FILE* log_file = fopen(file_name.c_str(), "w");
529 
530  if (!log_file)
531  return false;
532 
533  // Avoid saving excessive data
534  ITERATE(TLog, it, s_Log) {
535  Clean(it->first);
536  }
537 
538  // Save the last synced records
539  if (!s_WriteHeader(log_file)) {
540  fclose(log_file);
541  return false;
542  }
543 
544  // Save the log entries
545  ITERATE(TLog, it_slot, s_Log) {
546  const TSyncEvents& events = it_slot->second.events;
547  ITERATE(TSyncEvents, item, events) {
548  if (!s_WriteRecord(log_file, it_slot->first, (*item))) {
549  fclose(log_file);
550  return false;
551  }
552  }
553  }
554  fclose(log_file);
555  return true;
556 }
557 
558 Uint8
560 {
561  Uint2 real_slot = 0;
562  Uint2 time_bucket = 0;
564  event->key.PackedKey(), real_slot, time_bucket) || real_slot != slot) {
565  SRV_FATAL("Slot verification failed, blob key: " << event->key.PackedKey() <<
566  ", expected slot: " << slot << ", calculated slot: " << real_slot);
567  }
568 
569  SSlotData& data = s_GetSlotData(slot);
570  CMiniMutexGuard guard(data.lock);
571 
572  event->local_time = CSrvTime::Current().AsUSec();
573  s_GlobalLock.Lock();
574  event->rec_no = ++s_LastWrittenRecord;
576  // Avoid race condition:
577  // - user blob comes
578  // - event is written
579  // - sync started in another thread while orig_rec_no is updated
581  event->orig_rec_no = event->rec_no;
582 
583  data.events.push_back(event);
584  ++data.rec_number;
585  s_TotalRecords.Add(1);
586  return event->rec_no;
587 }
588 
589 void
591  Uint2 slot,
592  Uint8* local_synced_rec_no,
593  Uint8* remote_synced_rec_no)
594 {
596 
597  SSrvSyncedData& sync_data = s_SyncedData[server][slot];
598  *local_synced_rec_no = sync_data.local_rec_no;
599  *remote_synced_rec_no = sync_data.remote_rec_no;
600 }
601 
602 void
604  Uint2 slot,
605  Uint8 local_synced_rec_no,
606  Uint8 remote_synced_rec_no)
607 {
609 
610  SSrvSyncedData& sync_data = s_SyncedData[server][slot];
611  sync_data.local_rec_no = local_synced_rec_no;
612  sync_data.remote_rec_no = remote_synced_rec_no;
613 }
614 
615 Uint8
617 {
618  SSlotData& data = s_GetSlotData(slot);
619  CMiniMutexGuard guard(data.lock);
620  if (!data.events.empty())
621  return data.events.back()->rec_no;
622  else
623  return s_LastWrittenRecord;
624 }
625 
626 Uint8
628 {
629  return s_LastWrittenRecord;
630 }
631 
632 bool
634  Uint2 slot,
635  Uint8* local_start_rec_no,
636  Uint8* remote_start_rec_no,
637  TReducedSyncEvents* events)
638 {
639  {{
641  SSrvSyncedData& sync_data = s_SyncedData[server][slot];
642  if (sync_data.local_rec_no > *local_start_rec_no)
643  *local_start_rec_no = sync_data.local_rec_no;
644  else
645  sync_data.local_rec_no = *local_start_rec_no;
646  if (sync_data.remote_rec_no > *remote_start_rec_no)
647  *remote_start_rec_no = sync_data.remote_rec_no;
648  else
649  sync_data.remote_rec_no = *remote_start_rec_no;
650  }}
651 
652  SSlotData& data = s_GetSlotData(slot);
653  CMiniMutexGuard guard(data.lock);
654 
655  // Check the presence of the local record
656  if (data.events.empty()
657  || *local_start_rec_no < data.events.front()->rec_no
658  || s_LastWrittenRecord < *local_start_rec_no)
659  {
660  return false; // The required record is not available,
661  // all the blobs will be exchanged
662  }
663 
664  //Uint8 time_limit = 0;
665  //bool time_limit_set = false;
666  // Walk the container from the end with copying all the records which
667  // matched: rec_no > local_rec_no or timestamp within 10 sec
668  NON_CONST_REVERSE_ITERATE(TSyncEvents, record, data.events) {
669  SNCSyncEvent* evt = *record;
670  // local_time is used here because the records are ordered by
671  // local_time but not the origin time
672  //if (time_limit_set && evt->local_time < time_limit)
673  // break;
674  if (/*!time_limit_set && */evt->rec_no < *local_start_rec_no) {
675  //time_limit_set = true;
676  //time_limit = evt->local_time
677  // - CNCDistributionConf::GetPeriodicSyncTailTime();
678  break;
679  }
680 
681  // Now make the decision if the record should be memorized
682  // | rm | wr | pr <-- was saved
683  // ---------------------------------------------
684  // rm (any) | impossible | nothing | impossible
685  // wr | nothing | nothing | memorize
686  // pr | nothing | nothing | nothing
687  // ^- found
688  SBlobEvent& blob_event = (*events)[evt->key.PackedKey()];
689  switch (evt->event_type) {
690  case eSyncUpdate:
691  case eSyncRemove:
692  break;
693  case eSyncWrite:
694  if (!blob_event.wr_or_rm_event)
695  blob_event.wr_or_rm_event = evt;
696  break;
697  case eSyncProlong:
698  if (!blob_event.wr_or_rm_event && !blob_event.prolong_event)
699  blob_event.prolong_event = evt;
700  break;
701  }
702  }
703  return true;
704 }
705 
706 
707 bool
709  Uint2 slot,
710  Uint8 local_start_rec_no,
711  Uint8 remote_start_rec_no,
712  const TReducedSyncEvents& remote_events,
713  TSyncEvents* events_to_get,
714  TSyncEvents* events_to_send,
715  Uint8* local_synced_rec_no,
716  Uint8* remote_synced_rec_no)
717 {
718  // Get the local events
719  TReducedSyncEvents local_events;
720  if (!GetEventsList(server, slot, &local_start_rec_no,
721  &remote_start_rec_no, &local_events))
722  {
723  return false;
724  }
725 
726  Uint8 now = CSrvTime::Current().AsUSec();
727  s_CompareEvents(local_events, local_start_rec_no,
728  now, remote_events,
729  local_synced_rec_no, events_to_send);
730  s_CompareEvents(remote_events, remote_start_rec_no,
731  now, local_events,
732  remote_synced_rec_no, events_to_get);
733  return true;
734 }
735 
736 
737 Uint8
739 {
740  SSlotData& data = s_GetSlotData(slot);
741  CMiniMutexGuard guard(data.lock);
742 
744  Uint4 clean_to_recs = max_recs - CNCDistributionConf::GetCleanLogReserve();
746  Uint4 cleaned_cnt = 0;
747  Uint8 limit = s_GetMinLocalSyncedRecordNo(slot, data);
748  while (cleaned_cnt < max_clean_cnt) {
749  if (data.events.empty() || data.events.front()->rec_no > limit)
750  break; // Records are younger that should be deleted
751  // Delete the record, it is not required any more
752  delete data.events.front();
753  data.events.pop_front();
754  s_TotalRecords.Add(-1);
755  --data.rec_number;
756  ++cleaned_cnt;
757  }
758  if (data.rec_number > max_recs) {
759  while (data.rec_number > clean_to_recs && cleaned_cnt < max_clean_cnt)
760  {
761  delete data.events.front();
762  data.events.pop_front();
763  s_TotalRecords.Add(-1);
764  --data.rec_number;
765  ++cleaned_cnt;
766  }
767  }
768  return cleaned_cnt;
769 }
770 
771 Uint8
773 {
774  return s_TotalRecords.Get();
775 }
776 
777 Uint8
779 {
780  SSlotData& data = s_GetSlotData(slot);
781  return data.rec_number;
782 }
783 
784 bool
786 {
787  SSlotData& data = s_GetSlotData(slot);
789  return data.rec_number > max_recs;
790 }
791 
793 
CAtomicCounter –.
Definition: ncbicntr.hpp:71
Mutex created to have minimum possible size (its size is 4 bytes) and to sleep using kernel capabilit...
Definition: srv_sync.hpp:193
void Unlock(void)
Unlock the mutex.
Definition: srv_sync.cpp:119
void Lock(void)
Lock the mutex.
Definition: srv_sync.cpp:108
const string & PackedKey(void) const
Definition: netcached.hpp:180
static const TNCPeerList & GetPeers(void)
static Uint4 GetMaxCleanLogBatch(void)
static bool GetSlotByKey(const string &key, Uint2 &slot, Uint2 &time_bucket)
static const string & GetSyncLogFileName(void)
static const vector< Uint2 > & GetCommonSlots(Uint8 server)
static Uint8 GetSelfID(void)
static Uint4 GetMaxSlotLogEvents(void)
static Uint8 GetPeriodicSyncHeadTime(void)
static Uint4 GetCleanLogReserve(void)
static void Initialize(bool need_read_saved, Uint8 start_log_rec_no)
Definition: sync_log.cpp:450
static Uint8 AddEvent(Uint2 slot, SNCSyncEvent *event)
Definition: sync_log.cpp:559
static Uint8 Clean(Uint2 slot)
Definition: sync_log.cpp:738
static void GetLastSyncedRecNo(Uint8 server, Uint2 slot, Uint8 *local_synced_rec_no, Uint8 *remote_synced_rec_no)
Definition: sync_log.cpp:590
static bool GetEventsList(Uint8 server, Uint2 slot, Uint8 *local_start_rec_no, Uint8 *remote_start_rec_no, TReducedSyncEvents *events)
Definition: sync_log.cpp:633
static Uint8 GetCurrentRecNo(Uint2 slot)
Definition: sync_log.cpp:616
static bool Finalize(void)
Definition: sync_log.cpp:522
static bool GetSyncOperations(Uint8 server, Uint2 slot, Uint8 local_start_rec_no, Uint8 remote_start_rec_no, const TReducedSyncEvents &remote_events, TSyncEvents *events_to_get, TSyncEvents *events_to_send, Uint8 *local_synced_rec_no, Uint8 *remote_synced_rec_no)
Definition: sync_log.cpp:708
static Uint8 GetLastRecNo(void)
Definition: sync_log.cpp:627
static Uint8 GetLogSize(void)
Definition: sync_log.cpp:772
static bool IsOverLimit(Uint2 slot)
Definition: sync_log.cpp:785
static void SetLastSyncRecNo(Uint8 server, Uint2 slot, Uint8 local_synced_rec_no, Uint8 remote_synced_rec_no)
Definition: sync_log.cpp:603
static CSrvTime Current(void)
Exact current time with precision up to nanoseconds.
Uint8 AsUSec(void) const
Converts object's value to microseconds since epoch.
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
Definition: tempstr.hpp:65
void erase(iterator pos)
Definition: map.hpp:167
const_iterator begin() const
Definition: map.hpp:151
const_iterator end() const
Definition: map.hpp:152
void clear()
Definition: map.hpp:169
const_iterator find(const key_type &key) const
Definition: map.hpp:153
Definition: map.hpp:338
const char * file_name[]
char data[12]
Definition: iconv.c:80
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
#define ERASE_ITERATE(Type, Var, Cont)
Non-constant version with ability to erase current element, if container permits.
Definition: ncbimisc.hpp:843
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
Definition: ncbimisc.hpp:822
#define NON_CONST_REVERSE_ITERATE(Type, Var, Cont)
Non constant version of REVERSE_ITERATE macro.
Definition: ncbimisc.hpp:834
#define NULL
Definition: ncbistd.hpp:225
void Set(TValue new_value) THROWS_NONE
Set atomic counter value.
Definition: ncbicntr.hpp:185
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
Definition: ncbicntr.hpp:278
TValue Get(void) const THROWS_NONE
Get atomic counter value.
Definition: ncbicntr.hpp:168
void Critical(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1203
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
uint32_t Uint4
4-byte (32-bit) unsigned integer
Definition: ncbitype.h:103
uint16_t Uint2
2-byte (16-bit) unsigned integer
Definition: ncbitype.h:101
uint64_t Uint8
8-byte (64-bit) unsigned integer
Definition: ncbitype.h:105
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
FILE * file
const struct ncbi::grid::netcache::search::fields::KEY key
NCBI C++ auxiliary debug macros.
#define SRV_LOG(sev, msg)
Macro to be used for printing log messages.
Definition: srv_diag.hpp:162
#define SRV_FATAL(msg)
Definition: srv_diag.hpp:173
SNCSyncEvent * prolong_event
Definition: sync_log.hpp:98
SNCSyncEvent * wr_or_rm_event
Definition: sync_log.hpp:97
Uint8 local_rec_no
Definition: sync_log.cpp:106
Uint8 remote_rec_no
Definition: sync_log.cpp:107
Uint8 orig_time
Definition: sync_log.cpp:92
Uint8 orig_server
Definition: sync_log.cpp:94
ENCSyncEvent event_type
Definition: sync_log.cpp:90
Uint8 rec_no
Definition: sync_log.cpp:89
Uint2 slot
Definition: sync_log.cpp:91
Uint8 local_time
Definition: sync_log.cpp:98
Uint8 orig_rec_no
Definition: sync_log.cpp:96
Single event record.
Definition: sync_log.hpp:52
Uint8 orig_time
Definition: sync_log.hpp:57
CNCBlobKeyLight key
Definition: sync_log.hpp:55
Uint8 orig_server
Definition: sync_log.hpp:59
Uint8 orig_rec_no
Definition: sync_log.hpp:61
Uint8 local_time
Definition: sync_log.hpp:63
Uint8 rec_no
Definition: sync_log.hpp:53
ENCSyncEvent event_type
Definition: sync_log.hpp:56
bool isOlder(const SNCSyncEvent &other) const
Definition: sync_log.hpp:70
SSlotData(const SSlotData &other)
Definition: sync_log.cpp:55
Uint8 rec_number
Definition: sync_log.cpp:50
CMiniMutex lock
Definition: sync_log.cpp:48
SSlotData(void)
Definition: sync_log.cpp:52
TSyncEvents events
Definition: sync_log.cpp:49
Uint8 remote_rec_no
Definition: sync_log.cpp:67
Uint8 local_rec_no
Definition: sync_log.cpp:66
SSrvSyncedData(void)
Definition: sync_log.cpp:69
map< Uint2, SSrvSyncedData > TSrvSyncedMap
Definition: sync_log.cpp:77
static TLog s_Log
Definition: sync_log.cpp:76
static void s_CompareEvents(const TReducedSyncEvents &src, Uint8 start_rec_no, Uint8 now, const TReducedSyncEvents &other, Uint8 *synced_rec_no, TSyncEvents *diff)
Definition: sync_log.cpp:390
static void s_ProcessProlong(const SBlobEvent &src_event, const SBlobEvent &other_event, TSyncEvents *diff)
Definition: sync_log.cpp:293
static const size_t kMaxKeyLength
Definition: sync_log.cpp:85
static bool s_ReadRecord(FILE *file)
Definition: sync_log.cpp:167
static Uint8 s_LastWrittenRecord
Definition: sync_log.cpp:81
static void s_ProcessWrite(SNCSyncEvent *src_event, const SBlobEvent &other_event, TSyncEvents *diff)
Definition: sync_log.cpp:334
map< Uint2, SSlotData > TLog
Definition: sync_log.cpp:75
static TSyncedRecsMap s_SyncedData
Definition: sync_log.cpp:79
static CMiniMutex s_GlobalLock
Definition: sync_log.cpp:74
static bool s_WriteHeader(FILE *file)
Definition: sync_log.cpp:204
static bool s_SpecialFind(const TReducedSyncEvents &container, TReducedSyncEvents::const_iterator &current_iterator, const string &key)
Definition: sync_log.cpp:358
map< Uint8, TSrvSyncedMap > TSyncedRecsMap
Definition: sync_log.cpp:78
static bool s_WriteRecord(FILE *file, Uint2 slot, const SNCSyncEvent *event)
Definition: sync_log.cpp:233
static CAtomicCounter s_TotalRecords
Definition: sync_log.cpp:80
static SSlotData & s_GetSlotData(Uint2 slot)
Definition: sync_log.cpp:262
static Uint8 s_GetMinLocalSyncedRecordNo(Uint2 slot, const SSlotData &data)
Definition: sync_log.cpp:272
static bool s_ReadHeader(FILE *file)
Definition: sync_log.cpp:115
ENCSyncEvent
Event types to log.
Definition: sync_log.hpp:43
@ eSyncProlong
Definition: sync_log.hpp:45
@ eSyncWrite
Definition: sync_log.hpp:44
@ eSyncRemove
Definition: sync_log.hpp:47
@ eSyncUpdate
Definition: sync_log.hpp:46
list< SNCSyncEvent * > TSyncEvents
Definition: sync_log.hpp:91
else result
Definition: token2.c:20
Modified on Mon May 20 05:03:20 2024 by modify_doxy.py rev. 669887