NCBI C++ ToolKit
vdbblast_local.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: vdbblast_local.cpp 101102 2023-10-30 13:07:22Z fongah2 $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Amelia Fong
27  *
28  */
29 
30 /** @file vdbblast_local.cpp
31  */
32 
33 #include <ncbi_pch.hpp>
34 #include <corelib/ncbifile.hpp>
35 #include <corelib/ncbiapp.hpp>
40 #include <corelib/ncbithr.hpp>
41 #include <corelib/ncbitime.hpp>
42 #include <corelib/ncbimtx.hpp>
43 #include <corelib/ncbiexpt.hpp>
48 #ifdef _OPENMP
49 #include <omp.h>
50 #endif
52 BEGIN_SCOPE(blast)
53 
54 const string k_NOT_CSRA_DB("NOT_CSRA");
55 const string k_CSRA_CHUNK("CSRA_CHUNK: ");
56 
57 static void s_MergeAlignSet(CSeq_align_set & final_set, const CSeq_align_set & input_set, const int list_size)
58 {
59  CSeq_align_set::Tdata & final_list = final_set.Set();
60  const CSeq_align_set::Tdata & input_list = input_set.Get();
61 
62  CSeq_align_set::Tdata::const_iterator input_it = input_list.begin();
63  CSeq_align_set::Tdata::iterator final_it = final_list.begin();
64  int hit_count = 0;
65  while(input_it != input_list.end())
66  {
67  double final_evalue;
68  double input_evalue;
69 
70  if(hit_count >= list_size)
71  {
72  final_list.erase(final_it, final_list.end());
73  break;
74  }
75 
76  if(final_it == final_list.end())
77  {
78  // Want to keep append value to list until hit list is full or input list reaches the end
79  final_evalue = (double) kMax_UI8;
80  }
81  else
82  {
83  (*final_it)->GetNamedScore(CSeq_align::eScore_EValue, final_evalue);
84  }
85  (*input_it)->GetNamedScore(CSeq_align::eScore_EValue, input_evalue);
86 
87  if(input_evalue == final_evalue)
88  {
89  //Pulling a trick here to keep the program flow simple
90  //Replace the final evalue with input bitscore and vice versa
91  (*final_it)->GetNamedScore(CSeq_align::eScore_BitScore, input_evalue);
92  (*input_it)->GetNamedScore(CSeq_align::eScore_BitScore, final_evalue);
93  }
94 
95  if(input_evalue < final_evalue)
96  {
97  CSeq_align_set::Tdata::const_iterator start_input_it = input_it;
98  while(1)
99  {
100  const CSeq_id & id_prev = (*input_it)->GetSeq_id(1);
101  input_it++;
102  if(input_it == input_list.end())
103  {
104  break;
105  }
106 
107  if(! id_prev.Match((*input_it)->GetSeq_id(1)))
108  {
109  break;
110  }
111  }
112 
113  final_list.insert(final_it, start_input_it, input_it);
114  }
115  else
116  {
117  while(1)
118  {
119  const CSeq_id & id_prev = (*final_it)->GetSeq_id(1);
120  final_it++;
121 
122  if(final_it == final_list.end())
123  {
124  break;
125  }
126 
127  if(! id_prev.Match((*final_it)->GetSeq_id(1)))
128  {
129  break;
130  }
131  }
132  }
133  hit_count ++;
134  }
135 
136 }
137 
138 
139 static CRef<CSearchResultSet> s_CombineSearchSets(vector<CRef<CSearchResultSet> > & t, unsigned int num_of_threads, const int list_size)
140 {
141  CRef<CSearchResultSet> aggregate_search_result_set (new CSearchResultSet());
142  aggregate_search_result_set->clear();
143 
144  for(unsigned int i=0; i < t[0]->GetNumQueries(); i++)
145  {
146  vector< CRef<CSearchResults> > thread_results;
147  thread_results.push_back (CRef<CSearchResults> (&((*(t[0]))[i])));
148  const CSeq_id & id = *(thread_results[0]->GetSeqId());
149 
150  for(unsigned int d=1; d < num_of_threads; d++)
151  {
152  thread_results.push_back ((*(t[d]))[id]);
153  }
154 
155  CRef<CSeq_align_set> align_set(new CSeq_align_set);
156  TQueryMessages aggregate_messages;
157  for(unsigned int d=0; d< num_of_threads; d++)
158  {
159  if(thread_results[d]->HasAlignments())
160  {
161  CConstRef<CSeq_align_set> thread_align_set = thread_results[d]->GetSeqAlign();
162  if(align_set->IsEmpty())
163  {
164  align_set->Set().insert(align_set->Set().begin(),
165  thread_align_set->Get().begin(),
166  thread_align_set->Get().end());
167  }
168  else
169  {
170  s_MergeAlignSet(*align_set, *thread_align_set, list_size);
171  }
172  }
173  aggregate_messages.Combine(thread_results[d]->GetErrors());
174  }
175 
176  TMaskedQueryRegions query_mask;
177  thread_results[0]->GetMaskedQueryRegions(query_mask);
178  CRef<CSearchResults> aggregate_search_results (new CSearchResults(thread_results[0]->GetSeqId(),
179  align_set,
180  aggregate_messages,
181  thread_results[0]->GetAncillaryData(),
182  &query_mask));
183  aggregate_search_result_set->push_back(aggregate_search_results);
184 
185  }
186 
187  return aggregate_search_result_set;
188 
189 }
190 
192  CRef<IQueryFactory> query_factory,
193  CRef<CBlastOptionsHandle> opt_handle, Int4 & num_extensions,
194  bool include_filtered_reads)
195 {
196  bool isCSRA = false;
197  string csras = kEmptyStr;
198  if(dbs.compare(0, k_CSRA_CHUNK.size(), k_CSRA_CHUNK) == 0) {
199  isCSRA = true;
200  csras = dbs.substr(k_CSRA_CHUNK.size());
201  }
202 
203  CVDBBlastUtil vdbUtil(isCSRA?csras:dbs, true, isCSRA, include_filtered_reads);
204 
205  BlastSeqSrc* seqSrc = vdbUtil.GetSRASeqSrc();
206  CRef<IBlastSeqInfoSrc> seqInfoSrc = vdbUtil.GetSRASeqInfoSrc();
207 
208  CLocalBlast lcl_blast(query_factory, opt_handle, seqSrc, seqInfoSrc);
210  CRef<CSearchResultSet> results;
211  try
212  {
213  results = lcl_blast.Run();
214  }
215  catch (const CBlastException& e) {
216  ERR_POST("BLAST engine error: " << e.what());
217  // Temporary fix to avoid vdb core dump during cleanup SB-1170
218  exit(1);
219  }
220  num_extensions = lcl_blast.GetNumExtensions();
221  return results;
222 }
223 
226  CRef<CBlastOptionsHandle> opt_handle,
227  bool include_filtered_reads)
228 {
229  bool isCSRA = false;
230  string csras = kEmptyStr;
231  if(dbs.compare(0, k_CSRA_CHUNK.size(), k_CSRA_CHUNK) == 0) {
232  isCSRA = true;
233  csras = dbs.substr(k_CSRA_CHUNK.size());
234  }
235 
236  CRef<CVDBBlastUtil> vdbUtil(new CVDBBlastUtil(isCSRA?csras:dbs, false, isCSRA, include_filtered_reads));
237  CRef<CSearchResultSet> results;
238  BlastSeqSrc* seqSrc = vdbUtil->GetSRASeqSrc();
239  CRef<IBlastSeqInfoSrc> seqInfoSrc = vdbUtil->GetSRASeqInfoSrc();
240  CRef<CLocalDbAdapter> db_adapter(new CLocalDbAdapter(seqSrc, seqInfoSrc));
242  psi_opts.Reset(dynamic_cast <CPSIBlastOptionsHandle *> (&*opt_handle));
243 
244  CPsiBlast psi_blast(pssm, db_adapter, psi_opts);
246  try
247  {
248  results = psi_blast.Run();
249  }
250  catch (const CBlastException& e) {
251  ERR_POST("BLAST engine error: " << e.what());
252  // Temporary fix to avoid vdb core dump during cleanup SB-1170
253  exit(1);
254  }
255  return results;
256 }
257 
259 {
262 };
263 
264 
265 class CVDBThread : public CThread
266 {
267 public:
268  CVDBThread(CRef<IQueryFactory> query_factory,
269  vector<string> & chunks,
270  CRef<CBlastOptions> options, bool include_filtered_reads);
271 
273  vector<string> & chunks,
274  CRef<CBlastOptions> options, bool include_filtered_reads);
275 
276  void * Main(void);
277 private:
279 
282 
284  vector<string> m_chunks;
289 };
290 
291 /* CVDBThread */
293  vector<string> & chunks,
294  CRef<CBlastOptions> options,
295  bool include_filtered_reads):
296  m_chunks(chunks), m_include_filtered_reads(include_filtered_reads),
297  m_num_extensions(0), m_pssm(pssm)
298 
299 {
301 }
302 
303 
305  vector<string> & chunks,
306  CRef<CBlastOptions> options,
307  bool include_filtered_reads):
308  m_query_factory(query_factory), m_chunks(chunks),
309  m_include_filtered_reads(include_filtered_reads), m_num_extensions(0)
310 
311 {
312  if(options->GetProgramType() == eBlastTypeBlastn)
314  else
316  //m_opt_handle->GetOptions().DebugDumpText(NcbiCerr, "BLAST options", 1);
317 }
318 
319 void* CVDBThread::Main(void)
320 {
322  if(m_chunks.size() == 1) {
323  if(m_pssm.Empty()) {
324  result->thread_result_set = s_RunLocalVDBSearch(m_chunks[0], m_query_factory,
326  }
327  else {
329  }
330  }
331  else {
332  result->thread_result_set = RunTandemSearches();
333  }
334 
335  result->num_extensions = m_num_extensions;
336  return result;
337 
338 }
339 
341 {
342  unsigned int num_of_chunks = m_chunks.size();
343  vector<CRef<CSearchResultSet> > results;
344 
345  for(unsigned int i=0; i < num_of_chunks; i++) {
346  Int4 num_exts = 0;
347  if(m_pssm.Empty()){
348  results.push_back(s_RunLocalVDBSearch(m_chunks[i], m_query_factory,
350  }
351  else {
353  }
354  m_num_extensions +=num_exts;
355  }
356 
357  return s_CombineSearchSets(results, num_of_chunks,m_opt_handle->GetOptions().GetHitlistSize());
358 }
359 
360 
361 
362 
363 
364 /* CLocalVDBBlast */
367  SLocalVDBStruct & local_vdb,
368  bool include_filtered_reads):
369  m_query_vector(query_vector),
370  m_opt_handle(options),
371  m_total_num_seqs(local_vdb.total_num_seqs),
372  m_total_length(local_vdb.total_length),
373  m_chunks_for_thread(local_vdb.chunks_for_thread),
374  m_num_threads(local_vdb.chunks_for_thread.size()),
375  m_num_extensions(0),
376  m_include_filtered_reads(include_filtered_reads)
377 {
378 }
379 
380 /* CLocalVDBBlast */
383  SLocalVDBStruct & local_vdb,
384  bool include_filtered_reads):
385  m_opt_handle(options),
386  m_total_num_seqs(local_vdb.total_num_seqs),
387  m_total_length(local_vdb.total_length),
388  m_chunks_for_thread(local_vdb.chunks_for_thread),
389  m_num_threads(local_vdb.chunks_for_thread.size()),
390  m_num_extensions(0),
391  m_include_filtered_reads(include_filtered_reads),
392  m_pssm(pssm)
393 {
394 }
395 
396 
398 {
400  string db_name;
403 };
404 static bool s_SortDbSize(const SSortStruct & a, const SSortStruct & b)
405 {
406  return(a.length > b.length);
407 }
408 
409 static void
410 s_DivideDBsForThread(unsigned int num_threads, vector<SSortStruct> & in_list,
411  vector<vector<SSortStruct> > & out_list, vector<Uint8> & acc_size)
412 {
413  sort(in_list.begin(), in_list.end(),s_SortDbSize);
414 
415  for(unsigned int i=0; i < in_list.size(); i++)
416  {
417  unsigned int min_index = 0;
418  for(unsigned int j=1; j<num_threads; j++) {
419  if(acc_size[j] < acc_size[min_index])
420  min_index = j;
421  }
422  acc_size[min_index] += in_list[i].length;
423  out_list[min_index].push_back(in_list[i]);
424  }
425 }
426 
427 static void
428 s_RemoveNonCSRAEntry(vector<SSortStruct> & in_list)
429 {
430  vector<SSortStruct> filtered_list;
431  for(unsigned int i = 0; i < in_list.size(); i++) {
432  if(in_list[i].db_name == k_NOT_CSRA_DB)
433  continue;
434 
435  filtered_list.push_back(in_list[i]);
436  }
437  in_list.swap(filtered_list);
438 }
439 
440 
441 static void s_GetChunksForThread(vector<SSortStruct> & in_list, vector<string> & chunks,
442  const unsigned int dbs_per_chunk, const string tag)
443 {
444  if(in_list.size() == 0)
445  return;
446 
447  Uint8 num_seqs_count = 0;
448  string dbs = tag;
449  unsigned int db_count = 0;
450  for(unsigned int i=0; i < in_list.size(); i++) {
451  if((in_list[i].db_name != kEmptyStr) && (in_list[i].num_seqs != 0)) {
452  num_seqs_count += in_list[i].num_seqs;
453  if(num_seqs_count > (Uint8)kMax_I4 || db_count >= dbs_per_chunk) {
454  chunks.push_back(dbs);
455  _TRACE("Chunk: " << dbs << " Num Seqs: " << num_seqs_count -in_list[i].num_seqs);
456  dbs = tag + in_list[i].db_name;
457  num_seqs_count = in_list[i].num_seqs;
458  db_count =1;
459  }
460  else {
461  dbs +=in_list[i].db_name;
462  db_count ++;
463  }
464  dbs +=" ";
465  }
466  }
467  if(dbs != tag) {
468  chunks.push_back(dbs);
469  _TRACE("Chunk: " << dbs << " Num Seqs: " << num_seqs_count);
470  }
471 }
472 
473 static const unsigned int DEFAULT_MAX_DBS_OPEN = 8000;
474 static const unsigned int DEFAULT_MAX_DBS_PER_CHUNK = 1500;
475 
476 static unsigned int s_GetNumDbsPerChunk(unsigned int num_threads, unsigned int num_dbs)
477 {
478  unsigned int dbs_per_chunk = min(DEFAULT_MAX_DBS_OPEN/num_threads, num_dbs/num_threads);
479 
481  string max_dbs_env = env.Get("VDB_MAX_DBS_PER_CHUNK");
482  unsigned int max_dbs_per_chunk = DEFAULT_MAX_DBS_PER_CHUNK;
483  if(max_dbs_env != kEmptyStr) {
484  max_dbs_per_chunk = min(max_dbs_per_chunk, NStr::StringToUInt(max_dbs_env));
485  }
486 
487  if(max_dbs_per_chunk && max_dbs_per_chunk < dbs_per_chunk)
488  dbs_per_chunk = max_dbs_per_chunk;
489 
490  return dbs_per_chunk;
491 
492 }
493 
494 string CLocalVDBBlast::PreprocessDBs(SLocalVDBStruct & local_vdb, const string db_names, unsigned int threads,
495  ESRASearchMode search_mode)
496 {
497  vector<string> dbs;
498  NStr::Split(db_names, " ", dbs);
499  std::sort(dbs.begin(), dbs.end());
500  vector<string>::iterator uq = std::unique(dbs.begin(), dbs.end());
501  dbs.erase(uq, dbs.end());
502 
503  unsigned int num_dbs = dbs.size();
504  unsigned int num_threads = (num_dbs < threads) ? num_dbs : threads;
505 
506  vector <SSortStruct> p, r;
507  Uint8 total_length = 0;
508  Uint8 total_num_seqs = 0;
509  string openmp_exception = kEmptyStr;
510 
511  //CStopWatch sw(CStopWatch::eStart);
512  if(search_mode == eUnaligned) {
513  p.resize(num_dbs);
514  #pragma omp parallel for num_threads(num_threads) schedule(static) if (num_threads > 1) \
515  shared(num_dbs, p, dbs) reduction(+ : total_length, total_num_seqs)
516  for(unsigned int i=0; i < num_dbs; i++) {
517  try {
518  p[i].db_name = dbs[i];
519  CVDBBlastUtil::GetVDBStats(p[i].db_name, p[i].num_seqs, p[i].length);
520  if(p[i].num_seqs > (Uint8)kMax_I4) {
521  NCBI_THROW(CException, eUnknown, p[i].db_name + " has more than 2 billions seqs, exceeds max num of seqs supported");
522  }
523  total_length += p[i].length;
524  total_num_seqs += p[i].num_seqs;
525  } catch (CException & e) {
526  #pragma omp critical
527  openmp_exception += e.what();
528  }
529  }
530  }
531  else if(search_mode == eBoth) {
532  p.resize(num_dbs);
533  r.resize(num_dbs);
534  #pragma omp parallel for num_threads(num_threads) schedule(static) if (num_threads > 1) \
535  shared(num_dbs, p,r, dbs) reduction(+ : total_length, total_num_seqs)
536  for(unsigned int i=0; i < num_dbs; i++) {
537  try {
538  p[i].db_name = dbs[i];
539  if(CVDBBlastUtil::IsCSRA(dbs[i])) {
540  r[i].db_name = dbs[i];
541  CVDBBlastUtil::GetAllStats(p[i].db_name, p[i].num_seqs, p[i].length, r[i].num_seqs, r[i].length);
542  }
543  else {
544  CVDBBlastUtil::GetVDBStats(p[i].db_name, p[i].num_seqs, p[i].length);
545  r[i].db_name = k_NOT_CSRA_DB;
546  }
547  if((p[i].num_seqs > (Uint8)kMax_I4) || (r[i].num_seqs > (Uint8)kMax_I4)) {
548  NCBI_THROW(CException, eUnknown, dbs[i] + " has more than 2 billions seqs, exceeds max num of seqs supported");
549  }
550  total_length += (p[i].length + r[i].length);
551  total_num_seqs += (p[i].num_seqs + r[i].num_seqs);
552  } catch (CException & e) {
553  #pragma omp critical
554  openmp_exception += e.what();
555  }
556  }
557  }
558  else if (search_mode == eAligned) {
559  r.resize(num_dbs);
560  #pragma omp parallel for num_threads(num_threads) schedule(static) if (num_threads > 1) \
561  shared(num_dbs, r, dbs) reduction(+ : total_length, total_num_seqs)
562  for(unsigned int i=0; i < num_dbs; i++) {
563  try {
564  if(CVDBBlastUtil::IsCSRA(dbs[i])) {
565  r[i].db_name = dbs[i];
566  CVDBBlastUtil::GetVDBStats(r[i].db_name, r[i].num_seqs, r[i].length, true);
567  if(r[i].num_seqs > (Uint8)kMax_I4) {
568  NCBI_THROW(CException, eUnknown, dbs[i] + " has more than 2 billions seqs, exceeds max num of seqs supported");
569  }
570  total_length += r[i].length;
571  total_num_seqs += r[i].num_seqs;
572  }
573  else {
574  r[i].db_name = k_NOT_CSRA_DB;
575  }
576  } catch (CException & e) {
577  #pragma omp critical
578  openmp_exception += e.what();
579  }
580  }
581  }
582  else {
583  NCBI_THROW(CException, eUnknown, " Invalid Search Mode");
584  }
585 
586  if(openmp_exception != kEmptyStr) {
587  NCBI_THROW(CException, eUnknown, openmp_exception);
588  }
589  //cerr << "Time to process dbs : " << sw.Elapsed() << endl;
590 
591  if(search_mode != eUnaligned){
593  num_dbs = r.size() + p.size();
594  if(r.size() == 0) {
595  ERR_POST(Warning << "No CSRA db found.");
596  }
597  else {
598  Uint4 max_csra_thread = CVDBBlastUtil::GetMaxNumCSRAThread();
599  if(max_csra_thread != 0 && threads > max_csra_thread) {
600  threads = max_csra_thread;
601  }
602  num_threads = (num_dbs < threads) ? num_dbs : threads;
603  }
604  }
605 
606  local_vdb.total_length = total_length;
607 
608  if(total_num_seqs > (Uint8) kMax_I4) {
609  local_vdb.total_num_seqs = kMax_I4;
610  }
611  else if(total_num_seqs == 0){
612  local_vdb.total_num_seqs = 0;
613  string zero_seq_err = "DB list contains no searchable seqs in sra_mode " + NStr::IntToString(search_mode) +".";
614  NCBI_THROW(CException, eUnknown, zero_seq_err);
615 
616  }
617  else {
618  local_vdb.total_num_seqs = total_num_seqs;
619  }
620 
621  vector<Uint8> acc_size(num_threads, 0);
622  local_vdb.chunks_for_thread.resize(num_threads);
623  unsigned int dbs_per_chunk = s_GetNumDbsPerChunk(num_threads, num_dbs);
624 
625  if(search_mode != eAligned){
626  vector<vector<SSortStruct> > list_thread(num_threads);
627  if(num_threads != kDisableThreadedSearch) {
628  s_DivideDBsForThread(num_threads, p, list_thread, acc_size);
629  }
630  else {
631  list_thread[0]= p;
632  }
633  for(unsigned int t=0; t < num_threads; t++) {
634  s_GetChunksForThread(list_thread[t], local_vdb.chunks_for_thread[t], dbs_per_chunk, kEmptyStr);
635  }
636  }
637 
638  if((search_mode != eUnaligned) && (r.size() > 0)){
639  vector<vector<SSortStruct> > list_thread(num_threads);
640  if(num_threads != kDisableThreadedSearch) {
641  s_DivideDBsForThread(num_threads, r, list_thread, acc_size);
642  }
643  else {
644  list_thread[0]= r;
645  }
646  for(unsigned int t=0; t < num_threads; t++) {
647  s_GetChunksForThread(list_thread[t], local_vdb.chunks_for_thread[t], dbs_per_chunk, k_CSRA_CHUNK);
648  }
649  }
650 
651  return NStr::Join (dbs, " ");
652 }
653 
655 {
657  {
658  if(m_chunks_for_thread[0].size() == 1)
659  return;
660  }
661 
663  return;
664 
665  if(m_opt_handle->GetOptions().GetDbLength()!= 0)
666  return;
667 
670 
671  return;
672 }
673 
674 void s_TrimResults(CSearchResultSet & rs, int hit_list_size)
675 {
676  for(unsigned int i=0; i < rs.size(); i++)
677  {
678  rs[i].TrimSeqAlign(hit_list_size);
679  }
680 }
681 
682 static int s_GetModifiedHitlistSize(const int orig_size)
683 {
684  if(orig_size <= 200)
685  {
686  return (orig_size + 100);
687  }
688  else if(orig_size < 500)
689  {
690  return (orig_size + 75);
691  }
692 
693  return (orig_size + 50);
694 
695 }
696 
698 {
699  x_AdjustDbSize();
700 
702  {
703  // For 2na seq, the start and end of a seq are not byte align,
704  // making the hit list size bigger than normal to compensate
705  // for match and mismatch of the "unused" bases which could
706  // bump a better hit off the list
707  int hls = m_opt_handle->GetOptions().GetHitlistSize() ;
709  vector<string> & chunks=m_chunks_for_thread[0];
710  unsigned int num_chunks = m_chunks_for_thread[0].size();
711 
712  if(num_chunks == 1) {
713  //m_opt_handle->GetOptions().DebugDumpText(NcbiCerr, "BLAST options", 1);
714  CRef<CSearchResultSet> retval;
715  if(m_pssm.Empty()){
718  }
719  else {
721  }
723  s_TrimResults(*retval, hls);
724  return retval;
725  }
726  else
727  {
728  vector<CRef<CSearchResultSet> > results;
729  for(unsigned int i=0; i < num_chunks; i++)
730  {
731  if(m_pssm.Empty()){
732  Int4 num_exts = 0;
734  results.push_back(s_RunLocalVDBSearch(chunks[i], queries, m_opt_handle, num_exts, m_include_filtered_reads));
735  m_num_extensions += num_exts;
736  }
737  else {
738  results.push_back(s_RunPsiVDBSearch(chunks[i], m_pssm, m_opt_handle, m_include_filtered_reads));
739  }
740  }
741 
744  s_TrimResults(*retval, hls);
745  return retval;
746  }
747  }
748  else
749  {
750  return RunThreadedSearch();
751  }
752 }
753 
755 {
756  pssm.clear();
757  pssm.resize(m_num_threads);
758  pssm[0] = m_pssm;
759  for(unsigned int v=1; v < m_num_threads; v++) {
760  CNcbiStrstream oss;
761  oss << MSerial_AsnBinary << *m_pssm;
762  pssm[v].Reset(new CPssmWithParameters);
763  oss >> MSerial_AsnBinary >> *(pssm[v]);
764  }
765 }
766 
768 {
770  for(unsigned int i=0; i < orig.Size(); i++)
771  {
772  CRef<CBlastSearchQuery> q(new CBlastSearchQuery(*(orig[i]->GetQuerySeqLoc()), *(orig[i]->GetScope())));
773  q->SetMaskedRegions(orig[i]->GetMaskedRegions());
774  q->SetGeneticCodeId(orig[i]->GetGeneticCodeId());
775  clone->push_back(q);
776  }
777  return clone;
778 }
779 
781 {
782 
783  vector<CRef<CBlastQueryVector> > query_v(m_num_threads);
784  query_v[0] = m_query_vector;
785  for(unsigned int v=1; v < m_num_threads; v++)
786  {
788  }
789 
790  qf_v.resize(m_num_threads);
791  for(unsigned int v=0; v < m_num_threads; v++)
792  {
795  query_data->GetSequenceBlk();
796  qf_v[v] = queries;
797  }
798 }
799 
801 {
802  vector<SVDBThreadResult *> thread_results(m_num_threads, NULL);
803  vector <CVDBThread* > thread(m_num_threads, NULL);
804  vector<CRef<CSearchResultSet> > results;
805  vector<CRef<IQueryFactory> > query_factory;
806  vector<CRef<CPssmWithParameters> > pssm;
807  bool isPSI = m_pssm.NotEmpty();
808 
809  if(isPSI) {
810  pssm.resize(m_num_threads);
811  x_PreparePssm(pssm);
812  }
813  else {
814  query_factory.resize(m_num_threads);
815  x_PrepareQuery(query_factory);
816  }
817 
818  for(unsigned int t=0; t < m_num_threads; t++)
819  {
820  // CThread destructor is protected, all threads destory themselves when terminated
821  if (isPSI) {
823  }
824  else {
825  thread[t] = (new CVDBThread(query_factory[t], m_chunks_for_thread[t], m_opt_handle->SetOptions().Clone(), m_include_filtered_reads));
826  }
827  thread[t]->Run();
828  }
829 
830  for(unsigned int t=0; t < m_num_threads; t++)
831  {
832  thread[t]->Join(reinterpret_cast<void**> (&thread_results[t]));
833  }
834 
835  for(unsigned int t=0; t < m_num_threads; t++)
836  {
837  if(thread_results[t] == NULL) {
838  NCBI_THROW(CException, eUnknown, "Search Error");
839  }
840  m_num_extensions += thread_results[t]->num_extensions;
841  results.push_back(thread_results[t]->thread_result_set);
842  delete (thread_results[t]);
843  }
844 
846 
847 }
848 
850 {
851  return m_num_extensions;
852 }
853 
854 END_SCOPE(blast)
#define static
Declares the CBlastNucleotideOptionsHandle class.
@ eBlastTypeBlastn
Definition: blast_program.h:74
Defines BLAST error codes (user errors included)
Handle to the nucleotide-nucleotide options to the BLAST algorithm.
Query Vector.
Definition: sseqloc.hpp:276
void push_back(const value_type &element)
Add a value to the back of this container.
Definition: sseqloc.hpp:397
Search Query.
Definition: sseqloc.hpp:147
Class to perform a BLAST search on local BLAST databases Note that PHI-BLAST can be run using this cl...
Definition: local_blast.hpp:62
Interface to create a BlastSeqSrc suitable for use in CORE BLAST from a a variety of BLAST database/s...
bool m_include_filtered_reads
CRef< CBlastQueryVector > m_query_vector
CRef< CBlastOptionsHandle > m_opt_handle
void x_AdjustDbSize(void)
CRef< objects::CPssmWithParameters > m_pssm
static string PreprocessDBs(CLocalVDBBlast::SLocalVDBStruct &local_vdb, const string db_names, unsigned int num_threads=kDisableThreadedSearch, ESRASearchMode seach_mode=eAligned)
void x_PrepareQuery(vector< CRef< IQueryFactory > > &qf_v)
void x_PreparePssm(vector< CRef< CPssmWithParameters > > &pssm)
CLocalVDBBlast(const CLocalVDBBlast &)
static const unsigned int kDisableThreadedSearch
CRef< CSearchResultSet > RunThreadedSearch()
CRef< CSearchResultSet > Run()
unsigned int m_num_threads
vector< vector< string > > & m_chunks_for_thread
CNcbiEnvironment –.
Definition: ncbienv.hpp:110
NCBI C++ Object Manager dependant implementation of IQueryFactory.
Handle to the protein-protein options to the BLAST algorithm.
Runs a single iteration of the PSI-BLAST algorithm on a BLAST database.
Definition: psiblast.hpp:79
Search Results for All Queries.
Search Results for One Query.
bool IsEmpty() const
Handle to the protein-translated nucleotide options to the BLAST algorithm.
CVDBBlastUtil.
BlastSeqSrc * GetSRASeqSrc()
Return the stored SRA BlastSeqSrc object.
static Uint4 GetMaxNumCSRAThread(void)
static void GetVDBStats(const string &strAllRuns, Uint8 &num_seqs, Uint8 &length, bool getRefStats=false)
Fucntion to get around the OID (blastseqsrc) limit So num of seqs > int4 can be returned.
static bool IsCSRA(const string &db_name)
CRef< blast::IBlastSeqInfoSrc > GetSRASeqInfoSrc()
Return the SRA BlastSeqInfoSrc object (create if none exists).
static void GetAllStats(const string &strAllRuns, Uint8 &num_seqs, Uint8 &length, Uint8 &ref_num_seqs, Uint8 &ref_length)
CVDBThread(const CVDBThread &)
void * Main(void)
Derived (user-created) class must provide a real thread function.
CRef< CBlastOptionsHandle > m_opt_handle
CRef< CSearchResultSet > RunTandemSearches(void)
CRef< CPssmWithParameters > m_pssm
CVDBThread & operator=(const CVDBThread &)
CRef< IQueryFactory > m_query_factory
vector< string > m_chunks
bool m_include_filtered_reads
CVDBThread(CRef< IQueryFactory > query_factory, vector< string > &chunks, CRef< CBlastOptions > options, bool include_filtered_reads)
Collection of masked regions for a single query sequence.
Definition: seqlocinfo.hpp:113
Class for the messages for an individual query sequence.
static bool DLIST_NAME() in_list(DLIST_LIST_TYPE *list, DLIST_TYPE *item)
Definition: dlist.tmpl.h:103
static HENV env
Definition: transaction2.c:38
CRef< CSearchResultSet > Run()
Run the PSI-BLAST engine for one iteration.
Definition: psiblast.cpp:95
virtual void SetNumberOfThreads(size_t nthreads)
Mutator for the number of threads.
void SetHitlistSize(int s)
CRef< CSearchResultSet > Run()
Executes the search.
virtual BLAST_SequenceBlk * GetSequenceBlk()=0
Accessor for the BLAST_SequenceBlk structure.
CRef< ILocalQueryData > MakeLocalQueryData(const CBlastOptions *opts)
Creates and caches an ILocalQueryData.
Definition: query_data.cpp:52
int GetHitlistSize() const
CBlastOptions & SetOptions()
Returns a reference to the internal options class which this object is a handle for.
void SetDbSeqNum(unsigned int n)
void SetDbLength(Int8 l)
EBlastProgramType GetProgramType() const
Returns the CORE BLAST notion of program type.
size_type size() const
Identical to GetNumResults, provided to facilitate STL-style iteration.
Int8 GetDbLength() const
const CBlastOptions & GetOptions() const
Return the object which this object is a handle for.
CRef< CBlastOptions > Clone() const
Explicit deep copy of the Blast options object.
void push_back(value_type &element)
Add a value to the back of this container.
void Combine(const TQueryMessages &other)
Combine other messages with these.
Definition: blast_aux.cpp:978
void clear()
Clears the contents of this object.
Int4 GetNumExtensions()
Retrieve the number of extensions performed during the search.
Int8 GetEffectiveSearchSpace() const
#define NULL
Definition: ncbistd.hpp:225
#define _TRACE(message)
Definition: ncbidbg.hpp:122
#define ERR_POST(message)
Error posting with file, line number information but without error codes.
Definition: ncbidiag.hpp:186
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
void Warning(CExceptionArgs_Base &args)
Definition: ncbiexpt.hpp:1191
virtual const char * what(void) const noexcept
Standard report (includes full backlog).
Definition: ncbiexpt.cpp:342
#define MSerial_AsnBinary
Definition: serialbase.hpp:697
bool Match(const CSeq_id &sid2) const
Match() - TRUE if SeqIds are equivalent.
Definition: Seq_id.hpp:1065
void Reset(void)
Reset reference object.
Definition: ncbiobj.hpp:1439
void Reset(void)
Reset reference object.
Definition: ncbiobj.hpp:773
bool NotEmpty(void) const THROWS_NONE
Check if CRef is not empty – pointing to an object and has a non-null value.
Definition: ncbiobj.hpp:726
bool Empty(void) const THROWS_NONE
Check if CRef is empty – not pointing to any object, which means having a null value.
Definition: ncbiobj.hpp:719
#define kMax_UI8
Definition: ncbi_limits.h:222
int32_t Int4
4-byte (32-bit) signed integer
Definition: ncbitype.h:102
uint32_t Uint4
4-byte (32-bit) unsigned integer
Definition: ncbitype.h:103
uint64_t Uint8
8-byte (64-bit) unsigned integer
Definition: ncbitype.h:105
#define kMax_I4
Definition: ncbi_limits.h:218
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define END_SCOPE(ns)
End the previously defined scope.
Definition: ncbistl.hpp:75
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
#define BEGIN_SCOPE(ns)
Define a new scope.
Definition: ncbistl.hpp:72
#define kEmptyStr
Definition: ncbistr.hpp:123
static list< string > & Split(const CTempString str, const CTempString delim, list< string > &arr, TSplitFlags flags=0, vector< SIZE_TYPE > *token_pos=NULL)
Split a string using specified delimiters.
Definition: ncbistr.cpp:3461
static string IntToString(int value, TNumToStringFlags flags=0, int base=10)
Convert int to string.
Definition: ncbistr.hpp:5084
static string Join(const TContainer &arr, const CTempString &delim)
Join strings using the specified delimiter.
Definition: ncbistr.hpp:2697
static unsigned int StringToUInt(const CTempString str, TStringToNumFlags flags=0, int base=10)
Convert string to unsigned int.
Definition: ncbistr.cpp:642
Tdata & Set(void)
Assign a value to data member.
list< CRef< CSeq_align > > Tdata
const Tdata & Get(void) const
Get the member data.
where boath are integers</td > n< td ></td > n</tr > n< tr > n< td > tse</td > n< td > optional</td > n< td > String</td > n< td class=\"description\"> TSE option controls what blob is orig
exit(2)
int i
Main class to perform a BLAST search on the local machine.
constexpr auto sort(_Init &&init)
const struct ncbi::grid::netcache::search::fields::SIZE size
unsigned int a
Definition: ncbi_localip.c:102
EIPRangeType t
Definition: ncbi_localip.c:101
const char * tag
Defines the CNcbiApplication and CAppException classes for creating NCBI applications.
Defines NCBI C++ exception handling.
Defines classes: CDirEntry, CFile, CDir, CSymLink, CMemoryFile, CFileUtil, CFileLock,...
Multi-threading – mutexes; rw-locks; semaphore.
Multi-threading – classes, functions, and features.
Defines: CTimeFormat - storage class for time format.
T min(T x_, T y_)
double r(size_t dimension_, const Int4 *score_, const double *prob_, double theta_)
NOTE: This file contains work in progress and the APIs are likely to change, please do not rely on th...
Declares CPsiBlast, the C++ API for the PSI-BLAST engine.
Declares the CPSIBlastOptionsHandle class.
static bool GetSeqId(const T &d, set< string > &labels, const string name="", bool detect=false, bool found=false)
Complete type definition of Blast Sequence Source ADT.
Definition: blast_seqsrc.c:43
vector< vector< string > > chunks_for_thread
CRef< CSearchResultSet > thread_result_set
Declares the CTBlastnOptionsHandle class.
else result
Definition: token2.c:20
CScope & GetScope()
const string k_CSRA_CHUNK("CSRA_CHUNK: ")
CRef< CSearchResultSet > s_RunLocalVDBSearch(const string &dbs, CRef< IQueryFactory > query_factory, CRef< CBlastOptionsHandle > opt_handle, Int4 &num_extensions, bool include_filtered_reads)
void s_TrimResults(CSearchResultSet &rs, int hit_list_size)
static CRef< CSearchResultSet > s_CombineSearchSets(vector< CRef< CSearchResultSet > > &t, unsigned int num_of_threads, const int list_size)
static void s_DivideDBsForThread(unsigned int num_threads, vector< SSortStruct > &in_list, vector< vector< SSortStruct > > &out_list, vector< Uint8 > &acc_size)
static void s_GetChunksForThread(vector< SSortStruct > &in_list, vector< string > &chunks, const unsigned int dbs_per_chunk, const string tag)
static const unsigned int DEFAULT_MAX_DBS_PER_CHUNK
static const unsigned int DEFAULT_MAX_DBS_OPEN
static void s_RemoveNonCSRAEntry(vector< SSortStruct > &in_list)
static int s_GetModifiedHitlistSize(const int orig_size)
CRef< CBlastQueryVector > s_CloneBlastQueryVector(const CBlastQueryVector &orig)
static bool s_SortDbSize(const SSortStruct &a, const SSortStruct &b)
const string k_NOT_CSRA_DB("NOT_CSRA")
static unsigned int s_GetNumDbsPerChunk(unsigned int num_threads, unsigned int num_dbs)
CRef< CSearchResultSet > s_RunPsiVDBSearch(const string &dbs, CRef< CPssmWithParameters > pssm, CRef< CBlastOptionsHandle > opt_handle, bool include_filtered_reads)
static void s_MergeAlignSet(CSeq_align_set &final_set, const CSeq_align_set &input_set, const int list_size)
Declares the CLocalVDBBlast class.
#define const
Definition: zconf.h:232
Modified on Sun Jun 16 04:33:07 2024 by modify_doxy.py rev. 669887