58 #define TESTAPP_LOG_POST(x) do { ++m_LogMsgCount; LOG_POST(x); } while (0)
60 #define TESTAPP_ASSERT(expr, msg) \
63 cerr << "Assertion failed: (" << #expr << ") --- " << msg << endl; \
103 virtual void OnExit(
void);
107 #ifdef USE_NATIVE_THREADS
111 void RunNative(
void);
112 void JoinNative(
void**
result);
132 "CTestThread::CTestThread() - failed to initialize thread " <<
m_Idx);
141 "CTestThread::~CTestThread() - invalid number of threads: " << num);
144 "CTestThread::~CTestThread() - failed to destroy thread " <<
m_Idx);
152 "CTestThread::OnExit() - error exiting thread " <<
m_Idx);
174 #ifdef USE_NATIVE_THREADS
184 #if defined(NCBI_POSIX_THREADS)
189 return NativeWrapper(arg);
192 #elif defined(NCBI_WIN32_THREADS)
197 return NativeWrapper(arg);
203 void CTestThread::RunNative(
void)
208 #if defined(NCBI_WIN32_THREADS)
212 DWORD creation_flags = 0;
214 this, creation_flags, &thread_id);
220 0,
FALSE, DUPLICATE_SAME_ACCESS),
221 "CTestThread::RunNative() - failed to duplicate thread handle");
223 "CTestThread::RunNative() - failed to close thread handle");
224 #elif defined(NCBI_POSIX_THREADS)
227 "CTestThread::RunNative() - failed to init thread attributes");
229 NativeWrapperCaller,
this) == 0,
230 "CTestThread::RunNative() - failed to create thread");
232 "CTestThread::RunNative() - failed to destroy thread attributes");
238 TESTAPP_ASSERT(0,
"CTestThread::RunNative() - threads are not supported");
244 void CTestThread::JoinNative(
void**
result)
247 #if defined(NCBI_WIN32_THREADS)
249 "CTestThread::JoinNative() - failed to join thread");
252 && status !=
DWORD(STILL_ACTIVE),
253 "CTestThread::JoinNative() - failed to get thread exit code");
255 "CTestThread::JoinNative() - failed to close thread handle");
257 #elif defined(NCBI_POSIX_THREADS)
259 "CTestThread::JoinNative() - failed to join thread");
283 for (
int i = first_idx;
i < first_idx + spawn_max;
i++) {
286 #ifdef USE_NATIVE_THREADS
317 virtual void*
Main(
void);
325 unsigned int number_of_threads,
326 bool has_sync_point);
375 unsigned int number_of_threads,
377 : m_Number_of_threads(number_of_threads), m_Has_sync_point(has_sync_point),
378 m_Semaphore(0,number_of_threads), m_SyncCounter(0)
382 #ifdef USE_NATIVE_THREADS
405 bool reached =
false;
431 "CThreadGroup::ThreadWait() - invalid number of threads: " << num);
470 arg_desc->AddDefaultKey
471 (
"threads",
"NumThreads",
472 "Total number of threads to create and run",
474 arg_desc->SetConstraint
478 arg_desc->AddDefaultKey
479 (
"repeats",
"NumRepeats",
480 "In non-MT mode only(!) -- how many times to repeat the test. "
481 "If passed 0, then the value of argument `-threads' will be used.",
483 arg_desc->SetConstraint
487 arg_desc->AddDefaultKey
488 (
"spawnby",
"SpawnBy",
489 "Threads spawning factor",
491 arg_desc->SetConstraint
494 arg_desc->AddOptionalKey(
"seed",
"Randomization",
495 "Randomization seed value",
498 arg_desc->SetUsageContext(
GetArguments().GetProgramBasename(),
499 "MT-environment test");
513 #if !defined(NCBI_THREADS)
519 #if !defined(NCBI_THREADS)
521 if (!args[
"repeats"].AsInteger()) {
533 "CThreadedApp::Run() - failed to initialize application");
536 ?
static_cast<unsigned int>(
GetArgs()[
"seed"].AsInteger())
538 static_cast<unsigned int>(time(
NULL)) % 1000000);
542 unsigned int threshold =
NCBI_PARAM_TYPE(TEST_MT, Cascading)::GetDefault();
543 if (threshold > 100) {
544 ERR_FATAL(
"Cascading threshold must be less than 100");
546 bool cascading = (
static_cast<unsigned int>(rand() % 100)) < threshold;
547 #if !defined(NCBI_THREADS)
556 #if defined(NCBI_THREADS)
581 "CThreadedApp::Run() - thread " <<
i <<
" has not started");
582 #ifdef USE_NATIVE_THREADS
584 thr[
i]->JoinNative(&join_result);
586 "CThreadedApp::Run() - thread " <<
i <<
" failed to pass result to Join()");
589 thr[
i]->Join(&join_result);
591 "CThreadedApp::Run() - thread " <<
i <<
" failed to pass result to Join()");
598 for (
unsigned int t = 0;
601 thr[
i]->Join(&join_result);
603 "CThreadedApp::Run() - thread " <<
i <<
" failed to pass result to Join()");
607 "CThreadedApp::Run() - ivalid number of started threads: " <<
m_Reached.
size());
611 "CThreadedApp::Run() - error exiting application");
635 ERR_FATAL(
"Thread groups with no threads are not allowed");
638 unsigned int threshold =
640 if (threshold > 100) {
641 ERR_FATAL(
"IntragroupSyncPoint threshold must be less than 100");
644 for (
unsigned int g = 0;
g < count; ++
g) {
654 for (
unsigned int t = 0;
t < threads_left; ++
t) {
671 for (
unsigned int g = 0;
g < count; ++
g) {
691 const unsigned int count =
static_cast<unsigned int>(
m_ThreadGroups.size());
692 unsigned int start_now = count;
701 for (
g = 1;
g < count; ++
g) {
702 unsigned int dest = rand() % (
m_Max+1);
710 os <<
"Delayed thread groups: " << (count - start_now)
711 <<
", starting order: " << start_now;
769 thr[idx]->SyncPoint();
779 "because TestApp_GlobalSyncPoint() was called");
787 thr[idx]->GlobalSyncPoint();
792 unsigned int num_min,
unsigned int num_max)
CInGroupThread(CThreadGroup &group, int id)
virtual void SyncPoint(void)
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
CNcbiOstrstreamToString class helps convert CNcbiOstrstream to a string Sample usage:
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
virtual void GlobalSyncPoint(void)
virtual void * Main(void)
Derived (user-created) class must provide a real thread function.
CTestThread(CSeqDBAtlas &atlas)
static void StartCascadingThreads(void)
virtual void SyncPoint(void)
virtual void OnExit(void)
Override this to execute finalization code.
unsigned int m_Number_of_threads
unsigned int m_SyncCounter
CThreadGroup(unsigned int number_of_threads, bool has_sync_point)
void ThreadComplete(void)
iterator_bool insert(const value_type &val)
const_iterator find(const key_type &key) const
const_iterator end() const
virtual const CArgs & GetArgs(void) const
Get parsed command line arguments.
virtual void SetupArgDescriptions(CArgDescriptions *arg_desc)
Setup the command line argument descriptions.
const CNcbiArguments & GetArguments(void) const
Get the application's cached unprocessed command-line arguments.
@ eInteger
Convertible into an integer number (int or Int8)
void Set(TValue new_value) THROWS_NONE
Set atomic counter value.
TValue Add(int delta) THROWS_NONE
Atomically add value (=delta), and return new counter value.
TValue Get(void) const THROWS_NONE
Get atomic counter value.
#define ERR_FATAL(message)
Posting fatal error and abort.
const unsigned int k_NumThreadsMin
Minimum number of threads.
vector< SThreadGroup > m_ThreadGroups
const unsigned int k_NumThreadsMax
Maximum number of threads.
unsigned int x_InitializeDelayedStart(void)
CThreadedApp(void)
Constructor.
virtual bool TestApp_Exit(void)
Override this method to execute code after all threads terminate.
unsigned int m_LogMsgCount
const int k_SpawnByMax
Maximum number of spawn by threads.
void SetNumberOfDelayedStartSyncPoints(unsigned int num_min, unsigned int num_max)
Set allowed range of delayed-start sync points.
~CThreadedApp(void)
Destructor.
virtual bool Thread_Run(int idx)
Run the thread.
int Run(void)
Run the threads.
virtual bool Thread_Destroy(int idx)
Destroy the thread.
void Init(void)
Initialize the app.
unsigned int s_NumThreads
virtual bool Thread_Init(int idx)
Initialize the thread.
const int k_SpawnByMin
Minimum number of spawn by threads.
volatile unsigned int m_NextGroup
virtual bool Thread_Exit(int idx)
Exit the thread.
unsigned int number_of_threads
Number of threads in the group.
void TestApp_IntraGroupSyncPoint(void)
Wait until other threads in group reach the same sync point The call may be ignored by the applicatio...
void x_PrintThreadGroups(void)
virtual bool TestApp_Init(void)
Override this method to execute code before running threads.
void TestApp_GlobalSyncPoint(void)
Wait until ALL threads reach a call of TestApp_GlobalSyncPoint().
void x_InitializeThreadGroups(void)
vector< unsigned int > m_Delayed
void TestApp_DelayedStartSyncPoint(const string &name)
Start threads that belong to delayed group (this call may be ignored to provide more randomness).
void x_StartThreadGroup(unsigned int count)
bool has_sync_point
TRUE, if the group has intra-group sync point.
virtual bool TestApp_Args(CArgDescriptions &args)
Override this method to add your custom arguments.
#define NCBI_PARAM_TYPE(section, name)
Generate typename for a parameter from its {section, name} attributes.
static TPid GetPid(void)
Get process identifier (pid) for the current process.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
static string SizetToString(size_t value, TNumToStringFlags flags=0, int base=10)
Convert size_t to string.
static string IntToString(int value, TNumToStringFlags flags=0, int base=10)
Convert int to string.
static string UIntToString(unsigned int value, TNumToStringFlags flags=0, int base=10)
Convert UInt to string.
static unsigned int StringToUInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to unsigned int.
void * TWrapperArg
Define platform-dependent argument wrapper.
TThreadHandle m_Handle
platform-dependent thread handle
void Lock(void)
Acquire mutex for the current thread with no nesting checks.
pthread_t TThreadHandle
Define platform-dependent thread handle type.
void Wait(void)
Wait on semaphore.
static void InitializeMainThreadId(void)
Initialize main thread's TID.
static TWrapperRes Wrapper(TWrapperArg arg)
Function to use (internally) as the thread's startup function.
void * TWrapperRes
Define platform-dependent result wrapper.
void Post(unsigned int count=1)
Increment the semaphore by "count".
void Unlock(void)
Release mutex with no owner or nesting checks.
@ fRunAllowST
Allow threads to run in single thread builds.
#define HANDLE
An abstraction for a file handle.
unsigned int
A callback function used to compare two keys in a database.
void SleepMilliSec(unsigned long ml_sec, EInterruptOnSignal onsignal=eRestartOnSignal)
Multi-threading – mutexes; rw-locks; semaphore.
TWrapperRes(* FSystemWrapper)(TWrapperArg)
Thread group description parameters.
static CMiniMutex s_GlobalLock
static string s_GroupsCount(void)
NCBI_PARAM_DEF(unsigned int, TEST_MT, Cascading, 25)
DEFINE_STATIC_FAST_MUTEX(s_GlobalLock)
static CThreadedApp * s_Application
CRef< CTestThread > thr[k_NumThreadsMax]
static CAtomicCounter s_SyncCounter
NCBI_PARAM_DECL(unsigned int, TEST_MT, Cascading)
NCBI_PARAM_DEF_WITH_INIT(string, TEST_MT, GroupsCount, "", s_GroupsCount)
#define TESTAPP_LOG_POST(x)
static CSemaphore s_Semaphore(0, INT_MAX)
#define TESTAPP_ASSERT(expr, msg)
static volatile unsigned int s_NextIndex
static CAtomicCounter s_NumberOfThreads
static CStaticTls< int > s_ThreadIdxTLS
static CRef< CThreadGroup > thr_group[k_NumThreadsMax]
Wrapper for testing modules in MT environment.
int g(Seg_Gsm *spe, Seq_Mtf *psm, Thd_Gsm *tdg)