121 SRV_LOG(
Critical,
"Cannot read the number of saved pairs(server <--> slot) "
122 "with the last synced record numbers. Invalid file?");
129 if (fread(&record,
sizeof(record), 1,
file) != 1) {
130 SRV_LOG(
Critical,
"Cannot read last synced record numbers. Invalid file?");
142 if (peers.
find(it_srv->first) != peers.
end()) {
144 if (!slots.empty()) {
148 if (find(slots.begin(), slots.end(), it_slot->first)
151 srv_map.
erase(it_slot);
171 if (fread(&record,
sizeof(record), 1,
file) != 1)
176 if (fread(&key_size,
sizeof(key_size), 1,
file) != 1)
181 if (fread(
key, key_size, 1,
file) != 1)
195 s_Log[record.
slot].events.push_back(new_record);
208 count += it_srv->second.size();
222 if (fwrite(&record,
sizeof(record), 1,
file) != 1)
236 memset(&record, 0,
sizeof(record));
237 record.
rec_no =
event->rec_no;
246 if (fwrite(&record,
sizeof(record), 1,
file) != 1)
250 size_t key_size =
event->key.PackedKey().size();
251 if (fwrite(&key_size,
sizeof(key_size), 1,
file) != 1)
277 :
data.events.front()->rec_no);
346 diff->push_back(src_event);
352 diff->push_back(src_event);
363 for (; current_iterator != container.
end(); ++current_iterator) {
364 if (current_iterator->first ==
key)
366 if (current_iterator->first >
key)
394 Uint8* synced_rec_no,
399 Uint8 max_rec_no = 0;
403 for (; k != src_end; ++k) {
404 Uint8 op_rec_no = k->second.getMaxRecNoWithinTimeLimit(time_limit);
405 if (op_rec_no <= start_rec_no)
409 if (op_rec_no > max_rec_no)
410 max_rec_no = op_rec_no;
414 if (k->second.wr_or_rm_event !=
NULL
415 && (k->second.wr_or_rm_event->rec_no > start_rec_no))
417 diff->push_back(k->second.wr_or_rm_event);
419 else if (k->second.prolong_event !=
NULL
420 && (k->second.prolong_event->local_time < time_limit))
422 diff->push_back(k->second.prolong_event);
428 if (k->second.wr_or_rm_event !=
NULL
429 && (k->second.wr_or_rm_event->rec_no > start_rec_no))
431 s_ProcessWrite(k->second.wr_or_rm_event, other_event->second, diff);
433 if (k->second.prolong_event !=
NULL
434 && (k->second.prolong_event->local_time < time_limit))
441 *synced_rec_no = max_rec_no;
443 *synced_rec_no = start_rec_no;
456 if (!need_read_saved ||
file_name.empty()) {
465 FILE* log_file = fopen(
file_name.c_str(),
"r");
485 Uint4 recs_count = 0;
491 if (!feof(log_file)) {
493 <<
". Invalid file?");
510 data.rec_number =
data.events.size();
511 if (
data.rec_number != 0) {
512 Uint8 last_rec_no =
data.events.back()->rec_no;
528 FILE* log_file = fopen(
file_name.c_str(),
"w");
546 const TSyncEvents& events = it_slot->second.events;
562 Uint2 time_bucket = 0;
564 event->
key.
PackedKey(), real_slot, time_bucket) || real_slot != slot) {
566 ", expected slot: " << slot <<
", calculated slot: " << real_slot);
581 event->orig_rec_no =
event->rec_no;
583 data.events.push_back(event);
586 return event->rec_no;
592 Uint8* local_synced_rec_no,
593 Uint8* remote_synced_rec_no)
605 Uint8 local_synced_rec_no,
606 Uint8 remote_synced_rec_no)
620 if (!
data.events.empty())
621 return data.events.back()->rec_no;
635 Uint8* local_start_rec_no,
636 Uint8* remote_start_rec_no,
656 if (
data.events.empty()
657 || *local_start_rec_no <
data.events.front()->rec_no
674 if (evt->
rec_no < *local_start_rec_no) {
710 Uint8 local_start_rec_no,
711 Uint8 remote_start_rec_no,
715 Uint8* local_synced_rec_no,
716 Uint8* remote_synced_rec_no)
721 &remote_start_rec_no, &local_events))
729 local_synced_rec_no, events_to_send);
732 remote_synced_rec_no, events_to_get);
746 Uint4 cleaned_cnt = 0;
748 while (cleaned_cnt < max_clean_cnt) {
749 if (
data.events.empty() ||
data.events.front()->rec_no > limit)
752 delete data.events.front();
753 data.events.pop_front();
758 if (
data.rec_number > max_recs) {
759 while (
data.rec_number > clean_to_recs && cleaned_cnt < max_clean_cnt)
761 delete data.events.front();
762 data.events.pop_front();
781 return data.rec_number;
789 return data.rec_number > max_recs;
Mutex created to have minimum possible size (its size is 4 bytes) and to sleep using kernel capabilit...
void Unlock(void)
Unlock the mutex.
void Lock(void)
Lock the mutex.
const string & PackedKey(void) const
static const TNCPeerList & GetPeers(void)
static Uint4 GetMaxCleanLogBatch(void)
static bool GetSlotByKey(const string &key, Uint2 &slot, Uint2 &time_bucket)
static const string & GetSyncLogFileName(void)
static const vector< Uint2 > & GetCommonSlots(Uint8 server)
static Uint8 GetSelfID(void)
static Uint4 GetMaxSlotLogEvents(void)
static Uint8 GetPeriodicSyncHeadTime(void)
static Uint4 GetCleanLogReserve(void)
static void Initialize(bool need_read_saved, Uint8 start_log_rec_no)
static Uint8 AddEvent(Uint2 slot, SNCSyncEvent *event)
static Uint8 Clean(Uint2 slot)
static void GetLastSyncedRecNo(Uint8 server, Uint2 slot, Uint8 *local_synced_rec_no, Uint8 *remote_synced_rec_no)
static bool GetEventsList(Uint8 server, Uint2 slot, Uint8 *local_start_rec_no, Uint8 *remote_start_rec_no, TReducedSyncEvents *events)
static Uint8 GetCurrentRecNo(Uint2 slot)
static bool Finalize(void)
static bool GetSyncOperations(Uint8 server, Uint2 slot, Uint8 local_start_rec_no, Uint8 remote_start_rec_no, const TReducedSyncEvents &remote_events, TSyncEvents *events_to_get, TSyncEvents *events_to_send, Uint8 *local_synced_rec_no, Uint8 *remote_synced_rec_no)
static Uint8 GetLastRecNo(void)
static Uint8 GetLogSize(void)
static bool IsOverLimit(Uint2 slot)
static void SetLastSyncRecNo(Uint8 server, Uint2 slot, Uint8 local_synced_rec_no, Uint8 remote_synced_rec_no)
static CSrvTime Current(void)
Exact current time with precision up to nanoseconds.
Uint8 AsUSec(void) const
Converts object's value to microseconds since epoch.
CTempString implements a light-weight string on top of a storage buffer whose lifetime management is ...
container_type::const_iterator const_iterator
const_iterator begin() const
const_iterator end() const
const_iterator find(const key_type &key) const
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
#define ERASE_ITERATE(Type, Var, Cont)
Non-constant version with ability to erase current element, if container permits.
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
#define NON_CONST_REVERSE_ITERATE(Type, Var, Cont)
Non constant version of REVERSE_ITERATE macro.
void Set(TValue new_value) THROWS_NONE
Set atomic counter value.
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
TValue Get(void) const THROWS_NONE
Get atomic counter value.
void Critical(CExceptionArgs_Base &args)
void Warning(CExceptionArgs_Base &args)
uint32_t Uint4
4-byte (32-bit) unsigned integer
uint16_t Uint2
2-byte (16-bit) unsigned integer
uint64_t Uint8
8-byte (64-bit) unsigned integer
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
const struct ncbi::grid::netcache::search::fields::KEY key
NCBI C++ auxiliary debug macros.
#define SRV_LOG(sev, msg)
Macro to be used for printing log messages.
SNCSyncEvent * prolong_event
SNCSyncEvent * wr_or_rm_event
bool isOlder(const SNCSyncEvent &other) const
SSlotData(const SSlotData &other)
map< Uint2, SSrvSyncedData > TSrvSyncedMap
static void s_CompareEvents(const TReducedSyncEvents &src, Uint8 start_rec_no, Uint8 now, const TReducedSyncEvents &other, Uint8 *synced_rec_no, TSyncEvents *diff)
static void s_ProcessProlong(const SBlobEvent &src_event, const SBlobEvent &other_event, TSyncEvents *diff)
static const size_t kMaxKeyLength
static bool s_ReadRecord(FILE *file)
static Uint8 s_LastWrittenRecord
static void s_ProcessWrite(SNCSyncEvent *src_event, const SBlobEvent &other_event, TSyncEvents *diff)
map< Uint2, SSlotData > TLog
static TSyncedRecsMap s_SyncedData
static CMiniMutex s_GlobalLock
static bool s_WriteHeader(FILE *file)
static bool s_SpecialFind(const TReducedSyncEvents &container, TReducedSyncEvents::const_iterator ¤t_iterator, const string &key)
map< Uint8, TSrvSyncedMap > TSyncedRecsMap
static bool s_WriteRecord(FILE *file, Uint2 slot, const SNCSyncEvent *event)
static CAtomicCounter s_TotalRecords
static SSlotData & s_GetSlotData(Uint2 slot)
static Uint8 s_GetMinLocalSyncedRecordNo(Uint2 slot, const SSlotData &data)
static bool s_ReadHeader(FILE *file)
ENCSyncEvent
Event types to log.
list< SNCSyncEvent * > TSyncEvents