NCBI C++ ToolKit
rpsblast_local.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: rpsblast_local.cpp 100101 2023-06-15 14:10:29Z merezhuk $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Amelia Fong
27  *
28  */
29 
30 /** @file rpsblast_local.cpp
31  */
32 
33 #include <ncbi_pch.hpp>
34 #include <corelib/ncbifile.hpp>
35 #include <corelib/ncbiapp.hpp>
38 #include <corelib/ncbithr.hpp>
39 #include <corelib/ncbitime.hpp>
40 #include <corelib/ncbimtx.hpp>
41 #include <corelib/ncbiexpt.hpp>
45 
47 BEGIN_SCOPE(blast)
48 
49 static const char delimiter[] = "#rps#";
50 static const size_t delimiter_len = sizeof(delimiter) - 1; // exclude \0
51 
52 
53 static void s_ConvertConcatStringToVectorOfString(const string & s, vector<string> & v)
54 {
55  size_t pos_start = 0;
56  while(1)
57  {
58  size_t pos_find = s.find(delimiter, pos_start);
59  if(pos_find == string::npos)
60  break;
61  size_t length = pos_find - pos_start;
62  v.push_back(s.substr(pos_start, length));
63  pos_start = pos_find + delimiter_len;
64  }
65 
66  v.push_back(s.substr(pos_start, s.size() - pos_start ));
67 }
68 
69 static void s_MergeAlignSet(CSeq_align_set & final_set, const CSeq_align_set & input_set)
70 {
71  CSeq_align_set::Tdata & final_list = final_set.Set();
72  const CSeq_align_set::Tdata & input_list = input_set.Get();
73 
74  CSeq_align_set::Tdata::const_iterator input_it = input_list.begin();
75  CSeq_align_set::Tdata::iterator final_it = final_list.begin();
76  while(input_it != input_list.end())
77  {
78  double final_evalue;
79  double input_evalue;
80 
81  (*final_it)->GetNamedScore(CSeq_align::eScore_EValue, final_evalue);
82  (*input_it)->GetNamedScore(CSeq_align::eScore_EValue, input_evalue);
83 
84  if(input_evalue == final_evalue)
85  {
86  //Pulling a trick here to keep the program flow simple
87  //Replace the final evalue with input bitscore and vice versa
88  (*final_it)->GetNamedScore(CSeq_align::eScore_BitScore, input_evalue);
89  (*input_it)->GetNamedScore(CSeq_align::eScore_BitScore, final_evalue);
90  }
91 
92  if(input_evalue < final_evalue)
93  {
94  CSeq_align_set::Tdata::const_iterator start_input_it = input_it;
95  while(1)
96  {
97  const CSeq_id & id_prev = (*input_it)->GetSeq_id(1);
98  input_it++;
99  if(input_it == input_list.end())
100  {
101  break;
102  }
103 
104  if(! id_prev.Match((*input_it)->GetSeq_id(1)))
105  {
106  break;
107  }
108  }
109 
110  final_list.insert(final_it, start_input_it, input_it);
111  }
112  else
113  {
114  while(1)
115  {
116  const CSeq_id & id_prev = (*final_it)->GetSeq_id(1);
117  final_it++;
118 
119  if(final_it == final_list.end())
120  {
121  break;
122  }
123 
124  if(! id_prev.Match((*final_it)->GetSeq_id(1)))
125  {
126  break;
127  }
128  }
129 
130  if(final_it == final_list.end())
131  {
132  final_list.insert(final_it, input_it, input_list.end());
133  break;
134  }
135  }
136  }
137 }
138 
139 static CRef<CSearchResultSet> s_CombineSearchSets(vector<CRef<CSearchResultSet> > & t, unsigned int num_of_threads)
140 {
141  CRef<CSearchResultSet> aggregate_search_result_set (new CSearchResultSet());
142  aggregate_search_result_set->clear();
143 
144  for(unsigned int i=0; i < t[0]->GetNumQueries(); i++)
145  {
146  vector< CRef<CSearchResults> > thread_results;
147  thread_results.push_back (CRef<CSearchResults> (&((*(t[0]))[i])));
148  const CSeq_id & id = *(thread_results[0]->GetSeqId());
149 
150  for(unsigned int d=1; d < num_of_threads; d++)
151  {
152  thread_results.push_back ((*(t[d]))[id]);
153  }
154 
155  CRef<CSeq_align_set> align_set(new CSeq_align_set);
156  TQueryMessages aggregate_messages;
157  for(unsigned int d=0; d< num_of_threads; d++)
158  {
159  if(thread_results[d]->HasAlignments())
160  {
161  CConstRef<CSeq_align_set> thread_align_set = thread_results[d]->GetSeqAlign();
162  if(align_set->IsEmpty())
163  {
164  align_set->Set().insert(align_set->Set().begin(),
165  thread_align_set->Get().begin(),
166  thread_align_set->Get().end());
167  }
168  else
169  {
170  s_MergeAlignSet(*align_set, *thread_align_set);
171  }
172  }
173  aggregate_messages.Combine(thread_results[d]->GetErrors());
174  }
175 
176  TMaskedQueryRegions query_mask;
177  thread_results[0]->GetMaskedQueryRegions(query_mask);
178  CRef<CSearchResults> aggregate_search_results (new CSearchResults(thread_results[0]->GetSeqId(),
179  align_set,
180  aggregate_messages,
181  thread_results[0]->GetAncillaryData(),
182  &query_mask,
183  thread_results[0]->GetRID()));
184  aggregate_search_result_set->push_back(aggregate_search_results);
185 
186  }
187 
188  return aggregate_search_result_set;
189 
190 }
191 
192 static void s_ModifyVolumePaths(vector<string> & rps_database)
193 {
194  for(unsigned int i=0; i < rps_database.size(); i++)
195  {
196  size_t found = rps_database[i].find(".pal");
197  if(string::npos != found)
198  rps_database[i]= rps_database[i].substr(0, found);
199  }
200 }
201 
202 static bool s_SortDbSize(const pair<string, Int8> & a, const pair<string, Int8> & b)
203 {
204  return(a.second > b.second);
205 }
206 
207 static void s_MapDbToThread(vector<string> & db, unsigned int num_of_threads)
208 {
209  unsigned int db_size = static_cast<unsigned int>(db.size());
210  vector <pair <string, Int8> > p;
211 
212  for(unsigned int i=0; i < db_size; i++)
213  {
214  vector<string> path;
215  CSeqDB::FindVolumePaths(db[i], CSeqDB::eProtein, path, NULL, true);
216  _ASSERT(path.size() == 1);
217  CFile f(path[0]+".loo");
218  Int8 length = f.GetLength();
219  _ASSERT(length > 0 );
220  //Scale down, just in case
221  p.push_back(make_pair(db[i], length/1000));
222  }
223 
224  sort(p.begin(), p.end(),s_SortDbSize);
225 
226  db.resize(num_of_threads);
227  vector<Int8> acc_size(num_of_threads, 0);
228 
229  for(unsigned char i=0; i < num_of_threads; i++)
230  {
231  db[i] = p[i].first;
232  acc_size[i] = p[i].second;
233  }
234 
235  for(unsigned int i= num_of_threads; i < db_size; i++)
236  {
237  unsigned int min_index = 0;
238  for(unsigned int j=1; j<num_of_threads; j++)
239  {
240  if(acc_size[j] < acc_size[min_index])
241  min_index = j;
242  }
243 
244  acc_size[min_index] += p[i].second;
245  db[min_index] = db[min_index] + delimiter + p[i].first;
246  }
247 
248 }
249 
250 CRef<CSearchResultSet> s_RunLocalRpsSearch(const string & db,
251  CBlastQueryVector & query_vector,
252  CRef<CBlastOptionsHandle> opt_handle)
253 {
254  CSearchDatabase search_db(db, CSearchDatabase::eBlastDbIsProtein);
255  CRef<CLocalDbAdapter> db_adapter(new CLocalDbAdapter(search_db));
256  CRef<IQueryFactory> queries(new CObjMgr_QueryFactory(query_vector));
257 
258  CLocalBlast lcl_blast(queries, opt_handle, db_adapter);
259  CRef<CSearchResultSet> results = lcl_blast.Run();
260 
261  return results;
262 }
263 
264 
265 class CRPSThread : public CThread
266 {
267 public:
268  CRPSThread(CRef<CBlastQueryVector> query_vector,
269  const string & db,
270  CRef<CBlastOptions> options);
271 
272  void * Main(void);
273 
274 private:
275  CRef<CSearchResultSet> RunTandemSearches(void);
276 
277  CRPSThread(const CRPSThread &);
278  CRPSThread & operator=(const CRPSThread &);
279 
280  vector<string> m_db;
281  CRef<CBlastOptionsHandle> m_opt_handle;
282  CRef<CBlastQueryVector> m_query_vector;
283 };
284 
285 /* CRPSThread */
286 
287 CRPSThread::CRPSThread(CRef<CBlastQueryVector> query_vector,
288  const string & db,
289  CRef<CBlastOptions> options):
290  m_query_vector(query_vector)
291 
292 {
293  m_opt_handle.Reset(new CBlastRPSOptionsHandle(options));
294 
295  s_ConvertConcatStringToVectorOfString(db, m_db);
296 }
297 
298 void* CRPSThread::Main(void)
299 {
300  CRef<CSearchResultSet> * result = new (CRef<CSearchResultSet>);
301  if(m_db.size() == 1)
302  {
303  *result = s_RunLocalRpsSearch(m_db[0],
304  *m_query_vector,
305  m_opt_handle);
306  }
307  else
308  {
309  *result = RunTandemSearches();
310  }
311  return result;
312 
313 }
314 
315 CRef<CSearchResultSet> CRPSThread::RunTandemSearches(void)
316 {
317  unsigned int num_of_db = static_cast<unsigned int>(m_db.size());
318  vector<CRef<CSearchResultSet> > results;
319 
320  for(unsigned int i=0; i < num_of_db; i++)
321  {
322  results.push_back(s_RunLocalRpsSearch(m_db[i],
323  *m_query_vector,
324  m_opt_handle));
325  }
326 
327  return s_CombineSearchSets(results, num_of_db);
328 }
329 
330 /* CThreadedRpsBlast */
331 CLocalRPSBlast::CLocalRPSBlast(CRef<CBlastQueryVector> query_vector,
332  const string & db,
333  CRef<CBlastOptionsHandle> options,
334  unsigned int num_of_threads):
335  m_num_of_threads(num_of_threads),
336  m_db_name(db),
337  m_opt_handle(options),
338  m_query_vector(query_vector),
339  m_num_of_dbs(0)
340 {
341  CSeqDB::FindVolumePaths(db, CSeqDB::eProtein, m_rps_databases, NULL, true, true);
342  m_num_of_dbs = static_cast<unsigned int>(m_rps_databases.size());
343  if( 1 == m_num_of_dbs)
344  {
345  m_num_of_threads = kDisableThreadedSearch;
346  }
347 }
348 
349 void CLocalRPSBlast::x_AdjustDbSize(void)
350 {
351  if(m_opt_handle->GetOptions().GetEffectiveSearchSpace()!= 0)
352  return;
353 
354  if(m_opt_handle->GetOptions().GetDbLength()!= 0)
355  return;
356 
357  CSeqDB db(m_db_name, CSeqDB::eProtein);
358 
359  Uint8 db_size = db.GetTotalLengthStats();
360  int num_seq = db.GetNumSeqsStats();
361 
362  if(0 == db_size)
363  db_size = db.GetTotalLength();
364 
365  if(0 == num_seq)
366  num_seq = db.GetNumSeqs();
367 
368  m_opt_handle->SetOptions().SetDbLength(db_size);
369  m_opt_handle->SetOptions().SetDbSeqNum(num_seq);
370 
371  return;
372 }
373 
374 CRef<CSearchResultSet> CLocalRPSBlast::Run(void)
375 {
376  if(1 != m_num_of_dbs)
377  {
378  x_AdjustDbSize();
379  }
380 
381  if(kDisableThreadedSearch == m_num_of_threads)
382  {
383  if(1 == m_num_of_dbs)
384  {
385  return s_RunLocalRpsSearch(m_db_name, *m_query_vector, m_opt_handle);
386  }
387  else
388  {
389  s_ModifyVolumePaths(m_rps_databases);
390 
391  vector<CRef<CSearchResultSet> > results;
392  for(unsigned int i=0; i < m_num_of_dbs; i++)
393  {
394  results.push_back(s_RunLocalRpsSearch(m_rps_databases[i],
395  *m_query_vector,
396  m_opt_handle));
397 
398  }
399  return s_CombineSearchSets(results, m_num_of_dbs);
400  }
401 
402  }
403  else
404  {
405  return RunThreadedSearch();
406  }
407 }
408 
409 CRef<CSearchResultSet> CLocalRPSBlast::RunThreadedSearch(void)
410 {
411 
412  s_ModifyVolumePaths(m_rps_databases);
413 
414  if((kAutoThreadedSearch == m_num_of_threads) ||
415  (m_num_of_threads > m_rps_databases.size()))
416  {
417  //Default num of thread : a thread for each db
418  m_num_of_threads = static_cast<unsigned int>(m_rps_databases.size());
419  }
420  else if(m_num_of_threads < m_rps_databases.size())
421  {
422  // Combine databases, modified the size of rps_database
423  s_MapDbToThread(m_rps_databases, m_num_of_threads);
424  }
425 
426  vector<CRef<CSearchResultSet> * > thread_results(m_num_of_threads, NULL);
427  vector <CRPSThread* > thread(m_num_of_threads, NULL);
428  vector<CRef<CSearchResultSet> > results;
429 
430  for(unsigned int t=0; t < m_num_of_threads; t++)
431  {
432  // CThread destructor is protected, all threads destory themselves when terminated
433  thread[t] = (new CRPSThread(m_query_vector, m_rps_databases[t], m_opt_handle->SetOptions().Clone()));
434  thread[t]->Run();
435  }
436 
437  for(unsigned int t=0; t < m_num_of_threads; t++)
438  {
439  thread[t]->Join(reinterpret_cast<void**> (&thread_results[t]));
440  }
441 
442  for(unsigned int t=0; t < m_num_of_threads; t++)
443  {
444  results.push_back(*(thread_results[t]));
445  }
446 
447  CRef<CBlastRPSInfo> rpsInfo = CSetupFactory::CreateRpsStructures(m_db_name,
448  CRef<CBlastOptions> (&(m_opt_handle->SetOptions())));
449  return s_CombineSearchSets(results, m_num_of_threads);
450 
451 }
452 
453 
454 
455 END_SCOPE(blast)
456 END_NCBI_SCOPE
#define static
Declares the CBlastRPSOptionsHandle class.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
#define BEGIN_SCOPE(ns)
Define a new scope.
Definition: ncbistl.hpp:72
Main class to perform a BLAST search on the local machine.
Defines the CNcbiApplication and CAppException classes for creating NCBI applications.
Defines NCBI C++ exception handling.
Defines classes: CDirEntry, CFile, CDir, CSymLink, CMemoryFile, CFileUtil, CFileLock,...
Multi-threading – mutexes; rw-locks; semaphore.
Multi-threading – classes, functions, and features.
Defines: CTimeFormat - storage class for time format.
NOTE: This file contains work in progress and the APIs are likely to change, please do not rely on th...
static const char delimiter[]
Declares the CLocalRPSBlast class.
Defines BLAST database access classes.
#define const
Definition: zconf.h:230
Modified on Wed Nov 29 02:24:03 2023 by modify_doxy.py rev. 669887