NCBI C++ ToolKit
blast_node.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id:
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Amelia Fong
27  *
28  */
29 
30 /** @file blast_node.cpp
31  * BLAST node api
32  */
33 
34 #include <ncbi_pch.hpp>
35 #include <corelib/ncbiapp.hpp>
39 
40 #if defined(NCBI_OS_UNIX)
41 #include <unistd.h>
42 #endif
43 
44 #ifndef SKIP_DOXYGEN_PROCESSING
46 USING_SCOPE(blast);
48 #endif
49 static void
50 s_UnregisterDataLoader(const string & dbloader_prefix)
51 {
54  om->GetRegisteredNames(loader_names);
55  ITERATE(CObjectManager::TRegisteredNames, loader_name, loader_names) {
56  if (NStr::Find(*loader_name, dbloader_prefix) != NPOS) {
57  try {
58  if(om->RevokeDataLoader(*loader_name)){
59  _TRACE("Unregistered Data Loader: " + *loader_name);
60  }
61  else {
62  _TRACE("Failed to Unregistered Data Loader: " + *loader_name);
63  }
64  }
65  catch(CException &) {
66  _TRACE("Failed to unregister data loader: " + *loader_name);
67  }
68  }
69  }
70 }
71 
73 {
74  CFastMutexGuard guard(m_Mutex);
75  m_MsgQueue.push_back(msg);
77 }
78 
79 CBlastNode::CBlastNode (int node_num, const CNcbiArguments & ncbi_args, const CArgs& args,
80  CBlastAppDiagHandler & bah, int query_index, int num_queries, CBlastNodeMailbox * mailbox):
81  m_NodeNum(node_num), m_NcbiArgs(ncbi_args), m_Args(args),
82  m_Bah(bah), m_QueryIndex(query_index), m_NumOfQueries(num_queries),
83  m_QueriesLength(0), m_DataLoaderPrefix(kEmptyStr)
84 {
85  if(mailbox != NULL) {
86  m_Mailbox.Reset(mailbox);
87  }
88  string p("Query ");
89  p+=NStr::IntToString(query_index) + "-" + NStr::IntToString(query_index + num_queries -1);
90  m_NodeIdStr = p;
91 }
92 
94 
97  }
98  if(m_Mailbox.NotEmpty()) {
99  m_Mailbox.Reset();
100  }
101 }
102 
104 {
105  if (m_Mailbox.NotEmpty()) {
106  CRef<CBlastNodeMsg> m( new CBlastNodeMsg(msg_type, ptr));
107  m_Mailbox->SendMsg(m);
108  }
109 }
110 
111 void
113 {
114  static const string kPrefixThread = "BLASTDB_THREAD";
115  int t_id = CThread::GetSelf();
116  if (t_id !=0) {
118  }
119 }
120 
121 CBlastMasterNode::CBlastMasterNode(CNcbiOstream & out_stream, int num_threads):
122  m_OutputStream(out_stream), m_MaxNumThreads(num_threads), m_MaxNumNodes(num_threads + 2),
123  m_NumErrStatus(0), m_NumQueries(0), m_QueriesLength(0)
124 {
125  m_StopWatch.Start();
126 }
127 
128 void
130 {
131  CFastMutexGuard guard(m_Mutex);
133 }
134 
135 void
137 {
138  if(node == NULL) {
139  NCBI_THROW(CBlastException, eInvalidArgument, "Empty Node" );
140  }
141  if(mailbox == NULL) {
142  NCBI_THROW(CBlastException, eInvalidArgument, "Empty mailbox" );
143  }
144  if(mailbox->GetNodeNum() != node->GetNodeNum()) {
145  NCBI_THROW(CBlastException, eCoreBlastError, "Invalid mailbox node number" );
146  }
147  {
148  CFastMutexGuard guard(m_Mutex);
149  int node_num = node->GetNodeNum();
150  if ((m_PostOffice.find(node_num) != m_PostOffice.end()) ||
151  (m_RegisteredNodes.find(node_num) != m_RegisteredNodes.end())){
152  NCBI_THROW(CBlastException, eInvalidArgument, "Duplicate chunk num" );
153  }
154  m_PostOffice[node_num]= mailbox;
155  m_RegisteredNodes[node_num] = node;
156  }
157 }
158 
160 {
162  if(itr->second->GetNumMsgs() > 0) {
163  CRef<CBlastNodeMsg> msg = itr->second->ReadMsg();
164  int chunk_num = itr->first;
165  if (msg.NotEmpty()) {
166  switch (msg->GetMsgType()) {
168  {
169  if ((int) m_ActiveNodes.size() < m_MaxNumThreads) {
170  CBlastNode * n = (CBlastNode *) msg->GetMsgBody();
171  if(n != NULL) {
172  double start_time = m_StopWatch.Elapsed();
173  n->Run();
174  pair< int, double > p(chunk_num, start_time);
176  CRef<CBlastNodeMsg> empty_msg;
177  pair<int,CRef<CBlastNodeMsg> > m(chunk_num, empty_msg);
179  INFO_POST("Starting Chunk # " << chunk_num << " " << n->GetNodeIdStr());
180  }
181  else {
182  NCBI_THROW(CBlastException, eCoreBlastError, "Invalid mailbox node number" );
183  }
184  }
185  else {
186  itr->second->UnreadMsg(msg);
187  FormatResults();
188  if (IsFull()) {
190  }
191  return true;
192  }
193  break;
194  }
197  {
198  m_FormatQueue[itr->first] = msg;
199  double diff = m_StopWatch.Elapsed() - m_ActiveNodes[itr->first];
200  m_ActiveNodes.erase(chunk_num);
201  CTimeSpan s(diff);
202  INFO_POST("Chunk #" << chunk_num << " completed in " << s.AsSmartString());
203  break;
204  }
206  {
207  break;
208  }
209  default:
210  {
211  NCBI_THROW(CBlastException, eCoreBlastError, "Invalid node message type");
212  break;
213  }
214  }
215  }
216  }
217  }
218  FormatResults();
219  return IsActive();
220 }
221 
223 {
225 
226  while (itr != m_FormatQueue.end()){
227  CRef<CBlastNodeMsg> msg(itr->second);
228  if(msg.Empty()) {
229  break;
230  }
231  CBlastNode * n = (CBlastNode *) msg->GetMsgBody();
232  if(n == NULL) {
233  string err_msg = "Empty formatting msg for chunk num # " + NStr::IntToString(itr->first);
234  NCBI_THROW(CBlastException, eCoreBlastError, err_msg);
235  }
236  int node_num = n->GetNodeNum();
237  if (msg->GetMsgType() == CBlastNodeMsg::ePostResult) {
238  n->GetBlastResults(m_OutputStream);
239  }
240  else if (msg->GetMsgType() == CBlastNodeMsg::eErrorExit) {
241  m_NumErrStatus++;
242  ERR_POST("Chunk # " << node_num << " exit with error (" << n->GetStatus() << ")");
243  }
244  else {
245  NCBI_THROW(CBlastException, eCoreBlastError, "Invalid msg type");
246  }
247  m_NumQueries += n->GetNumOfQueries();
248  m_QueriesLength += n->GetQueriesLength();
249  n->Detach();
250  m_PostOffice.erase(node_num);
251  m_RegisteredNodes.erase(node_num);
252 
253  itr++;
254  }
255 
256  if (itr != m_FormatQueue.begin()) {
258  }
259 }
260 
262 {
263  TRegisteredNodes::reverse_iterator rr = m_RegisteredNodes.rbegin();
264  TActiveNodes::reverse_iterator ra = m_ActiveNodes.rbegin();
265  unsigned int in_buffer = m_MaxNumThreads;
266  if ((!m_RegisteredNodes.empty()) && (!m_ActiveNodes.empty())) {
267  in_buffer = rr->first - ra->first;
268  }
269  return ((int) (m_ActiveNodes.size() + in_buffer) >= m_MaxNumNodes);
270 }
271 
272 
273 bool s_IsSeqID(string & line)
274 {
275  static const int kMainAccSize = 32;
276  size_t digit_pos = line.find_last_of("0123456789|", kMainAccSize);
277  if (digit_pos != NPOS) {
278  return true;
279  }
280 
281  return false;
282 }
283 
284 int
285 CBlastNodeInputReader::GetQueryBatch(string & queries, int & query_no)
286 {
287  CNcbiOstrstream ss;
288  int q_size = 0;
289  int q_count = 0;
290  queries.clear();
291  query_no = -1;
292 
293  while ( !AtEOF()) {
294  string line = NStr::TruncateSpaces_Unsafe(*++(*this), NStr::eTrunc_Begin);
295  if (line.empty()) {
296  continue;
297  }
298  char c =line[0];
299  if (c == '!' || c == '#' || c == ';') {
300  continue;
301  }
302  bool isId = s_IsSeqID(line);
303  if ( isId || ( c == '>' )) {
304  if (q_size >= m_QueryBatchSize) {
305  UngetLine();
306  break;
307  }
308  q_count ++;
309  }
310  if (c != '>') {
311  q_size += isId? m_EstAvgQueryLength : line.size();
312  }
313  ss << line << endl;
314  }
315  ss.flush();
316  if (q_count > 0){
317  queries = ss.str();
318  query_no = m_QueryCount +1;
319  m_QueryCount +=q_count;
320  }
321  return q_count;
322 }
static const string kPrefixThread
Definition: bdbloader.cpp:151
Interface for reading SRA sequences into blast input.
USING_SCOPE(blast)
static void s_UnregisterDataLoader(const string &dbloader_prefix)
Definition: blast_node.cpp:50
USING_NCBI_SCOPE
Definition: blast_node.cpp:45
bool s_IsSeqID(string &line)
Definition: blast_node.cpp:273
BLAST node api.
CArgs –.
Definition: ncbiargs.hpp:379
Class to capture message from diag handler.
Definition: blast_aux.hpp:249
Defines BLAST error codes (user errors included)
CFastMutex m_Mutex
Definition: blast_node.hpp:164
TActiveNodes m_ActiveNodes
Definition: blast_node.hpp:168
CConditionVariable m_NewEvent
Definition: blast_node.hpp:170
void x_WaitForNewEvent()
Definition: blast_node.cpp:129
CStopWatch m_StopWatch
Definition: blast_node.hpp:165
CBlastMasterNode(CNcbiOstream &out_stream, int num_threads)
Definition: blast_node.cpp:121
TPostOffice m_PostOffice
Definition: blast_node.hpp:166
void RegisterNode(CBlastNode *node, CBlastNodeMailbox *mailbox)
Definition: blast_node.cpp:136
TRegisteredNodes m_RegisteredNodes
Definition: blast_node.hpp:167
TFormatQueue m_FormatQueue
Definition: blast_node.hpp:169
CNcbiOstream & m_OutputStream
Definition: blast_node.hpp:161
const int m_QueryBatchSize
Definition: blast_node.hpp:187
const int m_EstAvgQueryLength
Definition: blast_node.hpp:188
int GetQueryBatch(string &queries, int &query_no)
Definition: blast_node.cpp:285
CFastMutex m_Mutex
Definition: blast_node.hpp:83
list< CRef< CBlastNodeMsg > > m_MsgQueue
Definition: blast_node.hpp:82
void SendMsg(CRef< CBlastNodeMsg > msg)
Definition: blast_node.cpp:72
CConditionVariable & m_Notify
Definition: blast_node.hpp:81
EMsgType GetMsgType()
Definition: blast_node.hpp:53
void * GetMsgBody()
Definition: blast_node.hpp:54
string m_NodeIdStr
Definition: blast_node.hpp:123
string m_DataLoaderPrefix
Definition: blast_node.hpp:128
void SendMsg(CBlastNodeMsg::EMsgType msg_type, void *ptr=NULL)
Definition: blast_node.cpp:103
CRef< CBlastNodeMailbox > m_Mailbox
Definition: blast_node.hpp:124
void SetDataLoaderPrefix()
Definition: blast_node.cpp:112
int GetNodeNum()
Definition: blast_node.hpp:99
virtual ~CBlastNode(void)
Definition: blast_node.cpp:93
CBlastNode(int node_num, const CNcbiArguments &ncbi_args, const CArgs &args, CBlastAppDiagHandler &bah, int query_index, int num_queries, CBlastNodeMailbox *mailbox)
Definition: blast_node.cpp:79
CNcbiArguments –.
Definition: ncbienv.hpp:236
CTimeSpan.
Definition: ncbitime.hpp:1313
void erase(iterator pos)
Definition: map.hpp:167
size_type size() const
Definition: map.hpp:148
const_iterator begin() const
Definition: map.hpp:151
const_iterator end() const
Definition: map.hpp:152
iterator_bool insert(const value_type &val)
Definition: map.hpp:165
bool empty() const
Definition: map.hpp:149
const_iterator find(const key_type &key) const
Definition: map.hpp:153
static time_t start_time
Definition: timeout.c:14
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
Definition: ncbimisc.hpp:822
#define NULL
Definition: ncbistd.hpp:225
#define _TRACE(message)
Definition: ncbidbg.hpp:122
#define INFO_POST(message)
Definition: ncbidiag.hpp:201
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void UngetLine(void)
Unget current line, which must be valid.
bool AtEOF(void) const
Indicates (negatively) whether there is any more input.
static CRef< CObjectManager > GetInstance(void)
Return the existing object manager or create one.
vector< string > TRegisteredNames
void Reset(void)
Reset reference object.
Definition: ncbiobj.hpp:773
bool NotEmpty(void) const THROWS_NONE
Check if CRef is not empty – pointing to an object and has a non-null value.
Definition: ncbiobj.hpp:726
bool Empty(void) const THROWS_NONE
Check if CRef is empty – not pointing to any object, which means having a null value.
Definition: ncbiobj.hpp:719
IO_PREFIX::ostream CNcbiOstream
Portable alias for ostream.
Definition: ncbistre.hpp:149
static CTempString TruncateSpaces_Unsafe(const CTempString str, ETrunc where=eTrunc_Both)
Truncate spaces in a string.
Definition: ncbistr.cpp:3191
#define kEmptyStr
Definition: ncbistr.hpp:123
#define NPOS
Definition: ncbistr.hpp:133
static string IntToString(int value, TNumToStringFlags flags=0, int base=10)
Convert int to string.
Definition: ncbistr.hpp:5084
static SIZE_TYPE Find(const CTempString str, const CTempString pattern, ECase use_case=eCase, EDirection direction=eForwardSearch, SIZE_TYPE occurrence=0)
Find the pattern in the string.
Definition: ncbistr.cpp:2891
@ eTrunc_Begin
Truncate leading spaces only.
Definition: ncbistr.hpp:2240
static TID GetSelf(void)
Definition: ncbithr.cpp:515
void SignalSome(void)
Wake at least one of the threads that are currently waiting on this condition variable (if any thread...
Definition: ncbimtx.cpp:2594
bool WaitForSignal(CMutex &mutex, const CDeadline &deadline=CDeadline::eInfinite)
Release mutex and lock the calling thread until the condition variable is signalled.
Definition: ncbimtx.cpp:2554
double Elapsed(void) const
Return time elapsed since first Start() or last Restart() call (in seconds).
Definition: ncbitime.hpp:2775
string AsSmartString(ESmartStringPrecision precision, ERound rounding, ESmartStringZeroMode zero_mode=eSSZ_SkipZero) const
Transform time span to "smart" string.
Definition: ncbitime.hpp:2688
void Start(void)
Start the timer.
Definition: ncbitime.hpp:2764
yy_size_t n
Defines the CNcbiApplication and CAppException classes for creating NCBI applications.
Declares the CRemoteBlast class.
CRef< objects::CObjectManager > om
Modified on Sun May 19 04:45:05 2024 by modify_doxy.py rev. 669887