NCBI C++ ToolKit
test_mt.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: test_mt.cpp 93109 2021-03-05 14:17:23Z grichenk $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Aleksey Grichenko
27  *
28  * File Description:
29  * Wrapper for testing modules in MT environment
30  *
31  */
32 
33 #include <ncbi_pch.hpp>
34 #include <corelib/test_mt.hpp>
35 #include <corelib/ncbimtx.hpp>
36 #include <corelib/ncbi_system.hpp>
37 #include <corelib/ncbi_param.hpp>
38 #include <math.h>
39 #include <common/test_assert.h> /* This header must go last */
40 
41 
43 
44 // Uncomment the definition to use platform native threads rather
45 // than CThread.
46 //#define USE_NATIVE_THREADS
47 
50 
51 // Default values
52 unsigned int s_NumThreads = 34;
53 int s_SpawnBy = 6;
54 
55 // Next test thread index
56 static volatile unsigned int s_NextIndex = 0;
57 
58 #define TESTAPP_LOG_POST(x) do { ++m_LogMsgCount; LOG_POST(x); } while (0)
59 
60 #define TESTAPP_ASSERT(expr, msg) \
61  do { \
62  if (!(expr)) { \
63  cerr << "Assertion failed: (" << #expr << ") --- " << msg << endl; \
64  assert(false); \
65  } \
66  } while (0)
67 
68 /////////////////////////////////////////////////////////////////////////////
69 // Randomization parameters
70 
71 // if (rand() % 100 < threshold) then use cascading threads
72 NCBI_PARAM_DECL(unsigned int, TEST_MT, Cascading);
73 NCBI_PARAM_DEF(unsigned int, TEST_MT, Cascading, 25);
74 
75 // calculate # of thread groups
76 static string s_GroupsCount(void)
77 {
78  return NStr::UIntToString( (unsigned int)sqrt((double)s_NumThreads));
79 }
80 NCBI_PARAM_DECL(string, TEST_MT, GroupsCount);
81 NCBI_PARAM_DEF_WITH_INIT(string, TEST_MT, GroupsCount, "", s_GroupsCount);
82 
83 // group.has_sync_point = (rand() % 100) < threshold;
84 NCBI_PARAM_DECL(unsigned int, TEST_MT, IntragroupSyncPoint);
85 NCBI_PARAM_DEF(unsigned int, TEST_MT, IntragroupSyncPoint, 75);
86 
87 /////////////////////////////////////////////////////////////////////////////
88 // Test thread
89 //
90 
91 class CTestThread : public CThread
92 {
93 public:
94  static void StartCascadingThreads(void);
95 
96  CTestThread(int id);
97  virtual void SyncPoint(void) {};
98  virtual void GlobalSyncPoint(void);
99 
100 protected:
102  virtual void* Main(void);
103  virtual void OnExit(void);
104 
105  int m_Idx;
106 
107 #ifdef USE_NATIVE_THREADS
109 
110 public:
111  void RunNative(void);
112  void JoinNative(void** result);
113 
114  friend TWrapperRes NativeWrapper(TWrapperArg arg);
115 #endif
116 };
117 
118 
119 static CSemaphore s_Semaphore(0, INT_MAX); /* For GlobalSyncPoint()*/
120 static CAtomicCounter s_SyncCounter; /* For GlobalSyncPoint()*/
121 static CAtomicCounter s_NumberOfThreads; /* For GlobalSyncPoint()*/
122 
123 
125  : m_Idx(idx)
126 {
127  /* We want to know total number of threads, and the easiest way is to make
128  them register themselves */
130  if ( s_Application != 0 )
132  "CTestThread::CTestThread() - failed to initialize thread " << m_Idx);
133 }
134 
135 
137 {
139  auto num = s_NumberOfThreads.Get();
140  TESTAPP_ASSERT(num >= 0,
141  "CTestThread::~CTestThread() - invalid number of threads: " << num);
142  if ( s_Application != 0 )
144  "CTestThread::~CTestThread() - failed to destroy thread " << m_Idx);
145 }
146 
147 
149 {
150  if ( s_Application != 0 )
152  "CTestThread::OnExit() - error exiting thread " << m_Idx);
153 }
154 
155 
157 {
158  /* Semaphore is supposed to have zero value when threads come here,
159  so Wait() causes stop */
160  if (s_SyncCounter.Add(1) != s_NumberOfThreads.Get()) {
161  s_Semaphore.Wait();
162  return;
163  }
164  /* If we are the last thread to come to sync point, we yield
165  so that threads that were waiting for us go first */
166  if (s_NumberOfThreads.Get() > 1) {
167  s_Semaphore.Post((unsigned int)s_NumberOfThreads.Get() - 1);
168  s_SyncCounter.Set(0);
169  SleepMilliSec(0);
170  }
171 }
172 
173 
174 #ifdef USE_NATIVE_THREADS
175 
176 TWrapperRes NativeWrapper(TWrapperArg arg)
177 {
178  CTestThread* thread_obj = static_cast<CTestThread*>(arg);
179  thread_obj->Main();
180  return 0;
181 }
182 
183 
184 #if defined(NCBI_POSIX_THREADS)
185 extern "C" {
187 
188  static TWrapperRes NativeWrapperCaller(TWrapperArg arg) {
189  return NativeWrapper(arg);
190  }
191 }
192 #elif defined(NCBI_WIN32_THREADS)
193 extern "C" {
194  typedef TWrapperRes (WINAPI *FSystemWrapper)(TWrapperArg);
195 
196  static TWrapperRes WINAPI NativeWrapperCaller(TWrapperArg arg) {
197  return NativeWrapper(arg);
198  }
199 }
200 #endif
201 
202 
203 void CTestThread::RunNative(void)
204 {
205  // Run as the platform native thread rather than CThread
206  // Not all functionality will work in this mode. E.g. TLS
207  // cleanup can not be done automatically.
208 #if defined(NCBI_WIN32_THREADS)
209  // We need this parameter on WinNT - cannot use NULL instead!
210  DWORD thread_id;
211  // Suspend thread to adjust its priority
212  DWORD creation_flags = 0;
213  m_Handle = CreateThread(NULL, 0, NativeWrapperCaller,
214  this, creation_flags, &thread_id);
215  TESTAPP_ASSERT(m_Handle != NULL, "CTestThread::RunNative() - failed to create thread");
216  // duplicate handle to adjust security attributes
217  HANDLE oldHandle = m_Handle;
218  TESTAPP_ASSERT(DuplicateHandle(GetCurrentProcess(), oldHandle,
219  GetCurrentProcess(), &m_Handle,
220  0, FALSE, DUPLICATE_SAME_ACCESS),
221  "CTestThread::RunNative() - failed to duplicate thread handle");
222  TESTAPP_ASSERT(CloseHandle(oldHandle),
223  "CTestThread::RunNative() - failed to close thread handle");
224 #elif defined(NCBI_POSIX_THREADS)
225  pthread_attr_t attr;
226  TESTAPP_ASSERT(pthread_attr_init(&attr) == 0,
227  "CTestThread::RunNative() - failed to init thread attributes");
228  TESTAPP_ASSERT(pthread_create(&m_Handle, &attr,
229  NativeWrapperCaller, this) == 0,
230  "CTestThread::RunNative() - failed to create thread");
231  TESTAPP_ASSERT(pthread_attr_destroy(&attr) == 0,
232  "CTestThread::RunNative() - failed to destroy thread attributes");
233 #else
234  if (flags & fRunAllowST) {
235  Wrapper(this);
236  }
237  else {
238  TESTAPP_ASSERT(0, "CTestThread::RunNative() - threads are not supported");
239  }
240 #endif
241 }
242 
243 
244 void CTestThread::JoinNative(void** result)
245 {
246  // Join (wait for) and destroy
247 #if defined(NCBI_WIN32_THREADS)
248  TESTAPP_ASSERT(WaitForSingleObject(m_Handle, INFINITE) == WAIT_OBJECT_0,
249  "CTestThread::JoinNative() - failed to join thread");
250  DWORD status;
251  TESTAPP_ASSERT(GetExitCodeThread(m_Handle, &status)
252  && status != DWORD(STILL_ACTIVE),
253  "CTestThread::JoinNative() - failed to get thread exit code");
254  TESTAPP_ASSERT(CloseHandle(m_Handle),
255  "CTestThread::JoinNative() - failed to close thread handle");
256  m_Handle = NULL;
257 #elif defined(NCBI_POSIX_THREADS)
258  TESTAPP_ASSERT(pthread_join(m_Handle, 0) == 0,
259  "CTestThread::JoinNative() - failed to join thread");
260 #endif
261  *result = this;
262 }
263 
264 #endif // USE_NATIVE_THREADS
265 
266 
268 
270 {
271  int spawn_max;
272  int first_idx;
273  {{
274  CFastMutexGuard spawn_guard(s_GlobalLock);
275  spawn_max = s_NumThreads - s_NextIndex;
276  if (spawn_max > s_SpawnBy) {
277  spawn_max = s_SpawnBy;
278  }
279  first_idx = s_NextIndex;
281  }}
282  // Spawn more threads
283  for (int i = first_idx; i < first_idx + spawn_max; i++) {
284  thr[i] = new CTestThread(i);
285  // Allow threads to run even in single thread environment
286 #ifdef USE_NATIVE_THREADS
287  thr[i]->RunNative();
288 #else
289  thr[i]->Run(CThread::fRunAllowST);
290 #endif
291  }
292 }
293 
294 void* CTestThread::Main(void)
295 {
297  // Run the test
298  if ( s_Application != 0 && s_Application->Thread_Run(m_Idx) ) {
299  return this;
300  }
301 
302  return 0;
303 }
304 
305 /////////////////////////////////////////////////////////////////////////////
306 // Thread group
307 
308 class CThreadGroup;
310 {
311 public:
312  CInGroupThread(CThreadGroup& group, int id);
313  virtual void SyncPoint(void);
314 
315 protected:
316  ~CInGroupThread(void);
317  virtual void* Main(void);
319 };
320 
321 class CThreadGroup : public CObject
322 {
323 public:
324  CThreadGroup(
325  unsigned int number_of_threads,
326  bool has_sync_point);
327  ~CThreadGroup(void);
328 
329  void Go(void);
330  void SyncPoint(void);
331 
332  void ThreadWait(void);
333  void ThreadComplete(void);
334 
335 private:
336  unsigned int m_Number_of_threads;
340  unsigned int m_SyncCounter;
341 };
342 
343 
346 
348  : CTestThread(id), m_Group(group)
349 {
350 }
351 
353 {
354 }
355 
357 {
358  m_Group.SyncPoint();
359 }
360 
361 
363 {
365  s_ThreadIdxTLS.SetValue(reinterpret_cast<int*>((intptr_t)m_Idx));
366  // Run the test
367  if ( s_Application != 0 && s_Application->Thread_Run(m_Idx) ) {
369  return this;
370  }
371  return 0;
372 }
373 
375  unsigned int number_of_threads,
376  bool has_sync_point)
377  : m_Number_of_threads(number_of_threads), m_Has_sync_point(has_sync_point),
378  m_Semaphore(0,number_of_threads), m_SyncCounter(0)
379 {
380  for (unsigned int t = 0; t < m_Number_of_threads; ++t) {
381  thr[s_NextIndex] = new CInGroupThread(*this, s_NextIndex);
382 #ifdef USE_NATIVE_THREADS
383  thr[s_NextIndex]->RunNative();
384 #else
385  thr[s_NextIndex]->Run();
386 #endif
387  ++s_NextIndex;
388  }
389 }
390 
392 {
393 }
394 
395 inline
397 {
400 }
401 
403 {
404  if (m_Has_sync_point) {
405  bool reached = false;
406  m_Mutex.Lock();
407  ++m_SyncCounter;
409  m_SyncCounter = 0;
410  reached = true;
411  }
412  m_Mutex.Unlock();
413  if (reached) {
414  if (m_Number_of_threads > 1) {
416  SleepMilliSec(0);
417  }
418  } else {
419  m_Semaphore.Wait();
420  }
421  }
422 }
423 
424 
425 inline
427 {
429  auto num = s_NumberOfThreads.Get();
430  TESTAPP_ASSERT(num >= 0,
431  "CThreadGroup::ThreadWait() - invalid number of threads: " << num);
432  m_Semaphore.Wait();
433 }
434 
435 inline
437 {
438  if (m_Has_sync_point) {
439  m_Semaphore.Post();
440  }
441 }
442 
443 
444 /////////////////////////////////////////////////////////////////////////////
445 // Test application
446 
447 
449 {
450  m_Min = m_Max = 0;
451  m_NextGroup = 0;
452  m_LogMsgCount = 0;
453  s_Application = this;
455 }
456 
457 
459 {
460  s_Application = 0;
461 }
462 
463 
465 {
466  // Prepare command line descriptions
467  unique_ptr<CArgDescriptions> arg_desc(new CArgDescriptions);
468 
469  // s_NumThreads
470  arg_desc->AddDefaultKey
471  ("threads", "NumThreads",
472  "Total number of threads to create and run",
474  arg_desc->SetConstraint
476 
477  // s_NumThreads (emulation in ST)
478  arg_desc->AddDefaultKey
479  ("repeats", "NumRepeats",
480  "In non-MT mode only(!) -- how many times to repeat the test. "
481  "If passed 0, then the value of argument `-threads' will be used.",
483  arg_desc->SetConstraint
484  ("repeats", new CArgAllow_Integers(0, k_NumThreadsMax));
485 
486  // s_SpawnBy
487  arg_desc->AddDefaultKey
488  ("spawnby", "SpawnBy",
489  "Threads spawning factor",
491  arg_desc->SetConstraint
492  ("spawnby", new CArgAllow_Integers(k_SpawnByMin, k_SpawnByMax));
493 
494  arg_desc->AddOptionalKey("seed", "Randomization",
495  "Randomization seed value",
497 
498  arg_desc->SetUsageContext(GetArguments().GetProgramBasename(),
499  "MT-environment test");
500 
501  // Let test application add its own arguments
502  TestApp_Args(*arg_desc);
503 
504  SetupArgDescriptions(arg_desc.release());
505 }
506 
507 
509 {
510  // Process command line
511  const CArgs& args = GetArgs();
512 
513 #if !defined(NCBI_THREADS)
514  s_NumThreads = args["repeats"].AsInteger();
515  if ( !s_NumThreads )
516 #endif
517  s_NumThreads = args["threads"].AsInteger();
518 
519 #if !defined(NCBI_THREADS)
520  // Set reasonable repeats if not set through the argument
521  if (!args["repeats"].AsInteger()) {
522  unsigned int repeats = s_NumThreads / 6;
523  if (repeats < 4)
524  repeats = 4;
525  if (repeats < s_NumThreads)
526  s_NumThreads = repeats;
527  }
528 #endif
529 
530  s_SpawnBy = args["spawnby"].AsInteger();
531 
533  "CThreadedApp::Run() - failed to initialize application");
534 
535  unsigned int seed = GetArgs()["seed"]
536  ? static_cast<unsigned int>(GetArgs()["seed"].AsInteger())
537  : (static_cast<unsigned int>(CCurrentProcess::GetPid()) ^
538  static_cast<unsigned int>(time(NULL)) % 1000000);
539  TESTAPP_LOG_POST("Randomization seed value: " << seed);
540  srand(seed);
541 
542  unsigned int threshold = NCBI_PARAM_TYPE(TEST_MT, Cascading)::GetDefault();
543  if (threshold > 100) {
544  ERR_FATAL("Cascading threshold must be less than 100");
545  }
546  bool cascading = (static_cast<unsigned int>(rand() % 100)) < threshold;
547 #if !defined(NCBI_THREADS)
548  cascading = true;
549 #endif
550  if ( !cascading ) {
553  }
554  cascading = cascading || (m_ThreadGroups.size() == 0);
555 
556 #if defined(NCBI_THREADS)
557  TESTAPP_LOG_POST("Running " << s_NumThreads << " threads");
558 #else
559  TESTAPP_LOG_POST("Simulating " << s_NumThreads << " threads in ST mode");
560 #endif
561 
562  if (cascading) {
564  } else {
565  unsigned int start_now = x_InitializeDelayedStart();
566 
567  for (unsigned int g = 0; g < m_ThreadGroups.size(); ++g) {
568  thr_group[g] = new CThreadGroup
569  (m_ThreadGroups[g].number_of_threads,
570  m_ThreadGroups[g].has_sync_point);
571  }
572  x_StartThreadGroup(start_now);
573  }
574 
575  // Wait for all threads
576  if ( cascading ) {
577  for (unsigned int i = 0; i < s_NumThreads; i++) {
578  void* join_result;
579  // make sure all threads have started
580  TESTAPP_ASSERT(thr[i].NotEmpty(),
581  "CThreadedApp::Run() - thread " << i << " has not started");
582 #ifdef USE_NATIVE_THREADS
583  if (thr[i]) {
584  thr[i]->JoinNative(&join_result);
585  TESTAPP_ASSERT(join_result,
586  "CThreadedApp::Run() - thread " << i << " failed to pass result to Join()");
587  }
588 #else
589  thr[i]->Join(&join_result);
590  TESTAPP_ASSERT(join_result,
591  "CThreadedApp::Run() - thread " << i << " failed to pass result to Join()");
592 #endif
593  }
594  } else {
595  // join only those that started
596  unsigned int i = 0;
597  for (unsigned int g = 0; g < m_NextGroup; ++g) {
598  for (unsigned int t = 0;
599  t < m_ThreadGroups[g].number_of_threads; ++t, ++i) {
600  void* join_result;
601  thr[i]->Join(&join_result);
602  TESTAPP_ASSERT(join_result,
603  "CThreadedApp::Run() - thread " << i << " failed to pass result to Join()");
604  }
605  }
607  "CThreadedApp::Run() - ivalid number of started threads: " << m_Reached.size());
608  }
609 
611  "CThreadedApp::Run() - error exiting application");
612 
613  // Destroy all threads
614  for (unsigned int i=0; i<s_NumThreads; i++) {
615  thr[i].Reset();
616  }
617  // Destroy all groups
618  for (unsigned int i=0; i<m_ThreadGroups.size(); i++) {
619  thr_group[i].Reset();
620  }
621 
622  return 0;
623 }
624 
625 
627 {
628  unsigned int count = NStr::StringToUInt
629  (NCBI_PARAM_TYPE(TEST_MT, GroupsCount)::GetDefault());
630  if (count == 0) {
631  return;
632  }
633 
634  if(count > s_NumThreads) {
635  ERR_FATAL("Thread groups with no threads are not allowed");
636  }
637 
638  unsigned int threshold =
639  NCBI_PARAM_TYPE(TEST_MT, IntragroupSyncPoint)::GetDefault();
640  if (threshold > 100) {
641  ERR_FATAL("IntragroupSyncPoint threshold must be less than 100");
642  }
643 
644  for (unsigned int g = 0; g < count; ++g) {
645  SThreadGroup group;
646  // randomize intra-group sync points
647  group.has_sync_point = ((unsigned int)(rand() % 100)) < threshold;
648  group.number_of_threads = 1;
649  m_ThreadGroups.push_back(group);
650  }
651 
652  if (s_NumThreads > count) {
653  unsigned int threads_left = s_NumThreads - count;
654  for (unsigned int t = 0; t < threads_left; ++t) {
655  // randomize # of threads
656  m_ThreadGroups[ rand() % count ].number_of_threads += 1;
657  }
658  }
659 }
660 
661 
663 {
664  size_t count = m_ThreadGroups.size();
665  if (count != 0) {
666  TESTAPP_LOG_POST("Thread groups: " << count);
667  TESTAPP_LOG_POST("Number of delayed thread groups: from "
668  << m_Min << " to " << m_Max);
669  TESTAPP_LOG_POST("------------------------");
670  TESTAPP_LOG_POST("group threads sync_point");
671  for (unsigned int g = 0; g < count; ++g) {
672  CNcbiOstrstream os;
673  os.width(6);
674  os << left << g;
675  os.width(8);
676  os << left << m_ThreadGroups[g].number_of_threads;
677  if (m_ThreadGroups[g].has_sync_point) {
678  os << "yes";
679  } else {
680  os << "no ";
681  }
683  }
684  TESTAPP_LOG_POST("------------------------");
685  }
686 }
687 
688 
690 {
691  const unsigned int count = static_cast<unsigned int>(m_ThreadGroups.size());
692  unsigned int start_now = count;
693  unsigned int g;
694  if (m_Max == 0)
695  return start_now;
696 
697  for (g = 0; g < m_Max; ++g) {
698  m_Delayed.push_back(0);
699  }
700 
701  for (g = 1; g < count; ++g) {
702  unsigned int dest = rand() % (m_Max+1);
703  if (dest != 0) {
704  m_Delayed[dest - 1] += 1;
705  --start_now;
706  }
707  }
708 
709  CNcbiOstrstream os;
710  os << "Delayed thread groups: " << (count - start_now)
711  << ", starting order: " << start_now;
712  for (g = 0; g < m_Max; ++g) {
713  os << '+' << m_Delayed[g];
714  }
716 
717  return start_now;
718 }
719 
720 
721 void CThreadedApp::x_StartThreadGroup(unsigned int count)
722 {
724  while (count--) {
725  thr_group[m_NextGroup++]->Go();
726  }
727 }
728 
729 
730 /////////////////////////////////////////////////////////////////////////////
731 
732 bool CThreadedApp::Thread_Init(int /*idx*/)
733 {
734  return true;
735 }
736 
737 
738 bool CThreadedApp::Thread_Run(int /*idx*/)
739 {
740  return true;
741 }
742 
743 
744 bool CThreadedApp::Thread_Exit(int /*idx*/)
745 {
746  return true;
747 }
748 
749 
751 {
752  return true;
753 }
754 
756 {
757  return true;
758 }
759 
761 {
762  return true;
763 }
764 
765 
767 {
768  int idx = (int)(intptr_t(s_ThreadIdxTLS.GetValue()));
769  thr[idx]->SyncPoint();
770 }
771 
772 
774 {
775  {{
777  if (!m_Delayed.empty()) {
778  TESTAPP_LOG_POST("There were delayed threads, running them now, "
779  "because TestApp_GlobalSyncPoint() was called");
780  for (size_t i = m_Reached.size(); i < m_Delayed.size(); i++) {
783  }
784  }
785  }}
786  int idx = static_cast<int>(intptr_t(s_ThreadIdxTLS.GetValue()));
787  thr[idx]->GlobalSyncPoint();
788 }
789 
790 
792  unsigned int num_min, unsigned int num_max)
793 {
794  m_Min = num_min;
795  m_Max = num_max;
796 }
797 
798 
800 {
802  if (!m_Delayed.empty() && m_Reached.find(name) == m_Reached.end()) {
803  m_Reached.insert(name);
804  if (m_Reached.size() <= m_Delayed.size()) {
806  }
807  }
808 }
809 
810 
812 {
813  return true;
814 }
815 
816 
CArgAllow_Integers –.
Definition: ncbiargs.hpp:1751
CArgDescriptions –.
Definition: ncbiargs.hpp:541
CArgs –.
Definition: ncbiargs.hpp:379
CAtomicCounter –.
Definition: ncbicntr.hpp:71
CFastMutex –.
Definition: ncbimtx.hpp:667
CThreadGroup & m_Group
Definition: test_mt.cpp:318
CInGroupThread(CThreadGroup &group, int id)
Definition: test_mt.cpp:347
~CInGroupThread(void)
Definition: test_mt.cpp:352
virtual void SyncPoint(void)
Definition: test_mt.cpp:356
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
Definition: test_mt.cpp:362
CNcbiOstrstreamToString class helps convert CNcbiOstrstream to a string Sample usage:
Definition: ncbistre.hpp:802
CObject –.
Definition: ncbiobj.hpp:180
CRef –.
Definition: ncbiobj.hpp:618
CSemaphore –.
Definition: ncbimtx.hpp:1375
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
virtual void GlobalSyncPoint(void)
Definition: test_mt.cpp:156
~CTestThread(void)
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
CTestThread(CSeqDBAtlas &atlas)
static void StartCascadingThreads(void)
Definition: test_mt.cpp:269
virtual void SyncPoint(void)
Definition: test_mt.cpp:97
virtual void OnExit(void)
Override this to execute finalization code.
Definition: test_mt.cpp:148
void Go(void)
Definition: test_mt.cpp:396
unsigned int m_Number_of_threads
Definition: test_mt.cpp:336
bool m_Has_sync_point
Definition: test_mt.cpp:337
unsigned int m_SyncCounter
Definition: test_mt.cpp:340
~CThreadGroup(void)
Definition: test_mt.cpp:391
void ThreadWait(void)
Definition: test_mt.cpp:426
CThreadGroup(unsigned int number_of_threads, bool has_sync_point)
Definition: test_mt.cpp:374
void SyncPoint(void)
Definition: test_mt.cpp:402
CFastMutex m_Mutex
Definition: test_mt.cpp:339
void ThreadComplete(void)
Definition: test_mt.cpp:436
CSemaphore m_Semaphore
Definition: test_mt.cpp:338
CThreadedApp –.
Definition: test_mt.hpp:86
iterator_bool insert(const value_type &val)
Definition: set.hpp:149
size_type size() const
Definition: set.hpp:132
const_iterator find(const key_type &key) const
Definition: set.hpp:137
const_iterator end() const
Definition: set.hpp:136
static uch flags
virtual const CArgs & GetArgs(void) const
Get parsed command line arguments.
Definition: ncbiapp.cpp:285
virtual void SetupArgDescriptions(CArgDescriptions *arg_desc)
Setup the command line argument descriptions.
Definition: ncbiapp.cpp:1175
const CNcbiArguments & GetArguments(void) const
Get the application's cached unprocessed command-line arguments.
@ eInteger
Convertible into an integer number (int or Int8)
Definition: ncbiargs.hpp:592
#define NULL
Definition: ncbistd.hpp:225
void Set(TValue new_value) THROWS_NONE
Set atomic counter value.
Definition: ncbicntr.hpp:185
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
Definition: ncbicntr.hpp:278
TValue Get(void) const THROWS_NONE
Get atomic counter value.
Definition: ncbicntr.hpp:168
#define ERR_FATAL(message)
Posting fatal error and abort.
Definition: ncbidiag.hpp:240
const unsigned int k_NumThreadsMin
Minimum number of threads.
Definition: test_mt.hpp:54
vector< SThreadGroup > m_ThreadGroups
Definition: test_mt.hpp:207
const unsigned int k_NumThreadsMax
Maximum number of threads.
Definition: test_mt.hpp:57
unsigned int x_InitializeDelayedStart(void)
Definition: test_mt.cpp:689
CThreadedApp(void)
Constructor.
Definition: test_mt.cpp:448
virtual bool TestApp_Exit(void)
Override this method to execute code after all threads terminate.
Definition: test_mt.cpp:811
CFastMutex m_AppMutex
Definition: test_mt.hpp:202
unsigned int m_LogMsgCount
Definition: test_mt.hpp:210
const int k_SpawnByMax
Maximum number of spawn by threads.
Definition: test_mt.hpp:63
void SetNumberOfDelayedStartSyncPoints(unsigned int num_min, unsigned int num_max)
Set allowed range of delayed-start sync points.
Definition: test_mt.cpp:791
~CThreadedApp(void)
Destructor.
Definition: test_mt.cpp:458
virtual bool Thread_Run(int idx)
Run the thread.
Definition: test_mt.cpp:738
int Run(void)
Run the threads.
Definition: test_mt.cpp:508
set< string > m_Reached
Definition: test_mt.hpp:203
virtual bool Thread_Destroy(int idx)
Destroy the thread.
Definition: test_mt.cpp:750
void Init(void)
Initialize the app.
Definition: test_mt.cpp:464
unsigned int m_Min
Definition: test_mt.hpp:204
unsigned int s_NumThreads
Definition: test_mt.cpp:52
virtual bool Thread_Init(int idx)
Initialize the thread.
Definition: test_mt.cpp:732
const int k_SpawnByMin
Minimum number of spawn by threads.
Definition: test_mt.hpp:60
volatile unsigned int m_NextGroup
Definition: test_mt.hpp:205
virtual bool Thread_Exit(int idx)
Exit the thread.
Definition: test_mt.cpp:744
unsigned int number_of_threads
Number of threads in the group.
Definition: test_mt.hpp:133
void TestApp_IntraGroupSyncPoint(void)
Wait until other threads in group reach the same sync point The call may be ignored by the applicatio...
Definition: test_mt.cpp:766
unsigned int m_Max
Definition: test_mt.hpp:204
void x_PrintThreadGroups(void)
Definition: test_mt.cpp:662
virtual bool TestApp_Init(void)
Override this method to execute code before running threads.
Definition: test_mt.cpp:760
void TestApp_GlobalSyncPoint(void)
Wait until ALL threads reach a call of TestApp_GlobalSyncPoint().
Definition: test_mt.cpp:773
void x_InitializeThreadGroups(void)
Definition: test_mt.cpp:626
vector< unsigned int > m_Delayed
Definition: test_mt.hpp:206
void TestApp_DelayedStartSyncPoint(const string &name)
Start threads that belong to delayed group (this call may be ignored to provide more randomness).
Definition: test_mt.cpp:799
void x_StartThreadGroup(unsigned int count)
Definition: test_mt.cpp:721
bool has_sync_point
TRUE, if the group has intra-group sync point.
Definition: test_mt.hpp:135
virtual bool TestApp_Args(CArgDescriptions &args)
Override this method to add your custom arguments.
Definition: test_mt.cpp:755
int s_SpawnBy
Definition: test_mt.cpp:53
#define NCBI_PARAM_TYPE(section, name)
Generate typename for a parameter from its {section, name} attributes.
Definition: ncbi_param.hpp:149
int intptr_t
Definition: ncbitype.h:185
static TPid GetPid(void)
Get process identifier (pid) for the current process.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
static string SizetToString(size_t value, TNumToStringFlags flags=0, int base=10)
Convert size_t to string.
Definition: ncbistr.cpp:2751
static string IntToString(int value, TNumToStringFlags flags=0, int base=10)
Convert int to string.
Definition: ncbistr.hpp:5083
static string UIntToString(unsigned int value, TNumToStringFlags flags=0, int base=10)
Convert UInt to string.
Definition: ncbistr.hpp:5108
static unsigned int StringToUInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to unsigned int.
Definition: ncbistr.cpp:642
void * TWrapperArg
Define platform-dependent argument wrapper.
TThreadHandle m_Handle
platform-dependent thread handle
Definition: ncbithr.hpp:651
void Lock(void)
Acquire mutex for the current thread with no nesting checks.
pthread_t TThreadHandle
Define platform-dependent thread handle type.
void Wait(void)
Wait on semaphore.
Definition: ncbimtx.cpp:1787
static void InitializeMainThreadId(void)
Initialize main thread's TID.
Definition: ncbithr.cpp:482
static TWrapperRes Wrapper(TWrapperArg arg)
Function to use (internally) as the thread's startup function.
Definition: ncbithr.cpp:552
void * TWrapperRes
Define platform-dependent result wrapper.
void Post(unsigned int count=1)
Increment the semaphore by "count".
Definition: ncbimtx.cpp:1971
void Unlock(void)
Release mutex with no owner or nesting checks.
@ fRunAllowST
Allow threads to run in single thread builds.
Definition: ncbithr.hpp:549
#define HANDLE
An abstraction for a file handle.
Definition: mdb.c:383
unsigned int
A callback function used to compare two keys in a database.
Definition: types.hpp:1210
int i
EIPRangeType t
Definition: ncbi_localip.c:101
void SleepMilliSec(unsigned long ml_sec, EInterruptOnSignal onsignal=eRestartOnSignal)
Multi-threading – mutexes; rw-locks; semaphore.
TWrapperRes(* FSystemWrapper)(TWrapperArg)
Definition: ncbithr.cpp:679
unsigned int DWORD
Definition: sqltypes.h:98
Thread group description parameters.
Definition: test_mt.hpp:131
static CMiniMutex s_GlobalLock
Definition: sync_log.cpp:74
static string s_GroupsCount(void)
Definition: test_mt.cpp:76
NCBI_PARAM_DEF(unsigned int, TEST_MT, Cascading, 25)
DEFINE_STATIC_FAST_MUTEX(s_GlobalLock)
static CThreadedApp * s_Application
Definition: test_mt.cpp:49
CRef< CTestThread > thr[k_NumThreadsMax]
Definition: test_mt.cpp:267
static CAtomicCounter s_SyncCounter
Definition: test_mt.cpp:120
NCBI_PARAM_DECL(unsigned int, TEST_MT, Cascading)
NCBI_PARAM_DEF_WITH_INIT(string, TEST_MT, GroupsCount, "", s_GroupsCount)
#define TESTAPP_LOG_POST(x)
Definition: test_mt.cpp:58
static CSemaphore s_Semaphore(0, INT_MAX)
#define TESTAPP_ASSERT(expr, msg)
Definition: test_mt.cpp:60
static volatile unsigned int s_NextIndex
Definition: test_mt.cpp:56
static CAtomicCounter s_NumberOfThreads
Definition: test_mt.cpp:121
static CStaticTls< int > s_ThreadIdxTLS
Definition: test_mt.cpp:345
static CRef< CThreadGroup > thr_group[k_NumThreadsMax]
Definition: test_mt.cpp:344
Wrapper for testing modules in MT environment.
static int seed
Definition: test_table.cpp:132
@ FALSE
Definition: testodbc.c:27
int g(Seg_Gsm *spe, Seq_Mtf *psm, Thd_Gsm *tdg)
Definition: thrddgri.c:44
else result
Definition: token2.c:20
Modified on Mon Dec 11 02:36:33 2023 by modify_doxy.py rev. 669887