50 static const size_t delimiter_len = sizeof(delimiter) - 1; // exclude \0
53 static void s_ConvertConcatStringToVectorOfString(const string & s, vector<string> & v)
58 size_t pos_find = s.find(delimiter, pos_start);
59 if(pos_find == string::npos)
61 size_t length = pos_find - pos_start;
62 v.push_back(s.substr(pos_start, length));
63 pos_start = pos_find + delimiter_len;
66 v.push_back(s.substr(pos_start, s.size() - pos_start ));
69 static void s_MergeAlignSet(CSeq_align_set & final_set, const CSeq_align_set & input_set)
71 CSeq_align_set::Tdata & final_list = final_set.Set();
72 const CSeq_align_set::Tdata & input_list = input_set.Get();
74 CSeq_align_set::Tdata::const_iterator input_it = input_list.begin();
75 CSeq_align_set::Tdata::iterator final_it = final_list.begin();
76 while(input_it != input_list.end())
81 (*final_it)->GetNamedScore(CSeq_align::eScore_EValue, final_evalue);
82 (*input_it)->GetNamedScore(CSeq_align::eScore_EValue, input_evalue);
84 if(input_evalue == final_evalue)
86 //Pulling a trick here to keep the program flow simple
87 //Replace the final evalue with input bitscore and vice versa
88 (*final_it)->GetNamedScore(CSeq_align::eScore_BitScore, input_evalue);
89 (*input_it)->GetNamedScore(CSeq_align::eScore_BitScore, final_evalue);
92 if(input_evalue < final_evalue)
94 CSeq_align_set::Tdata::const_iterator start_input_it = input_it;
97 const CSeq_id & id_prev = (*input_it)->GetSeq_id(1);
99 if(input_it == input_list.end())
104 if(! id_prev.Match((*input_it)->GetSeq_id(1)))
110 final_list.insert(final_it, start_input_it, input_it);
116 const CSeq_id & id_prev = (*final_it)->GetSeq_id(1);
119 if(final_it == final_list.end())
124 if(! id_prev.Match((*final_it)->GetSeq_id(1)))
130 if(final_it == final_list.end())
132 final_list.insert(final_it, input_it, input_list.end());
139 static CRef<CSearchResultSet> s_CombineSearchSets(vector<CRef<CSearchResultSet> > & t, unsigned int num_of_threads)
141 CRef<CSearchResultSet> aggregate_search_result_set (new CSearchResultSet());
142 aggregate_search_result_set->clear();
144 for(unsigned int i=0; i < t[0]->GetNumQueries(); i++)
146 vector< CRef<CSearchResults> > thread_results;
147 thread_results.push_back (CRef<CSearchResults> (&((*(t[0]))[i])));
148 const CSeq_id & id = *(thread_results[0]->GetSeqId());
150 for(unsigned int d=1; d < num_of_threads; d++)
152 thread_results.push_back ((*(t[d]))[id]);
155 CRef<CSeq_align_set> align_set(new CSeq_align_set);
156 TQueryMessages aggregate_messages;
157 for(unsigned int d=0; d< num_of_threads; d++)
159 if(thread_results[d]->HasAlignments())
161 CConstRef<CSeq_align_set> thread_align_set = thread_results[d]->GetSeqAlign();
162 if(align_set->IsEmpty())
164 align_set->Set().insert(align_set->Set().begin(),
165 thread_align_set->Get().begin(),
166 thread_align_set->Get().end());
170 s_MergeAlignSet(*align_set, *thread_align_set);
173 aggregate_messages.Combine(thread_results[d]->GetErrors());
176 TMaskedQueryRegions query_mask;
177 thread_results[0]->GetMaskedQueryRegions(query_mask);
178 CRef<CSearchResults> aggregate_search_results (new CSearchResults(thread_results[0]->GetSeqId(),
181 thread_results[0]->GetAncillaryData(),
183 thread_results[0]->GetRID()));
184 aggregate_search_result_set->push_back(aggregate_search_results);
188 return aggregate_search_result_set;
192 static void s_ModifyVolumePaths(vector<string> & rps_database)
194 for(unsigned int i=0; i < rps_database.size(); i++)
196 size_t found = rps_database[i].find(".pal");
197 if(string::npos != found)
198 rps_database[i]= rps_database[i].substr(0, found);
202 static bool s_SortDbSize(const pair<string, Int8> & a, const pair<string, Int8> & b)
204 return(a.second > b.second);
207 static void s_MapDbToThread(vector<string> & db, unsigned int num_of_threads)
209 unsigned int db_size = static_cast<unsigned int>(db.size());
210 vector <pair <string, Int8> > p;
212 for(unsigned int i=0; i < db_size; i++)
215 CSeqDB::FindVolumePaths(db[i], CSeqDB::eProtein, path, NULL, true);
216 _ASSERT(path.size() == 1);
217 CFile f(path[0]+".loo");
218 Int8 length = f.GetLength();
219 _ASSERT(length > 0 );
220 //Scale down, just in case
221 p.push_back(make_pair(db[i], length/1000));
224 sort(p.begin(), p.end(),s_SortDbSize);
226 db.resize(num_of_threads);
227 vector<Int8> acc_size(num_of_threads, 0);
229 for(unsigned char i=0; i < num_of_threads; i++)
232 acc_size[i] = p[i].second;
235 for(unsigned int i= num_of_threads; i < db_size; i++)
237 unsigned int min_index = 0;
238 for(unsigned int j=1; j<num_of_threads; j++)
240 if(acc_size[j] < acc_size[min_index])
244 acc_size[min_index] += p[i].second;
245 db[min_index] = db[min_index] + delimiter + p[i].first;
250 CRef<CSearchResultSet> s_RunLocalRpsSearch(const string & db,
251 CBlastQueryVector & query_vector,
252 CRef<CBlastOptionsHandle> opt_handle)
254 CSearchDatabase search_db(db, CSearchDatabase::eBlastDbIsProtein);
255 CRef<CLocalDbAdapter> db_adapter(new CLocalDbAdapter(search_db));
256 CRef<IQueryFactory> queries(new CObjMgr_QueryFactory(query_vector));
258 CLocalBlast lcl_blast(queries, opt_handle, db_adapter);
259 CRef<CSearchResultSet> results = lcl_blast.Run();
265 class CRPSThread : public CThread
268 CRPSThread(CRef<CBlastQueryVector> query_vector,
270 CRef<CBlastOptions> options);
275 CRef<CSearchResultSet> RunTandemSearches(void);
277 CRPSThread(const CRPSThread &);
278 CRPSThread & operator=(const CRPSThread &);
281 CRef<CBlastOptionsHandle> m_opt_handle;
282 CRef<CBlastQueryVector> m_query_vector;
287 CRPSThread::CRPSThread(CRef<CBlastQueryVector> query_vector,
289 CRef<CBlastOptions> options):
290 m_query_vector(query_vector)
293 m_opt_handle.Reset(new CBlastRPSOptionsHandle(options));
295 s_ConvertConcatStringToVectorOfString(db, m_db);
298 void* CRPSThread::Main(void)
300 CRef<CSearchResultSet> * result = new (CRef<CSearchResultSet>);
303 *result = s_RunLocalRpsSearch(m_db[0],
309 *result = RunTandemSearches();
315 CRef<CSearchResultSet> CRPSThread::RunTandemSearches(void)
317 unsigned int num_of_db = static_cast<unsigned int>(m_db.size());
318 vector<CRef<CSearchResultSet> > results;
320 for(unsigned int i=0; i < num_of_db; i++)
322 results.push_back(s_RunLocalRpsSearch(m_db[i],
327 return s_CombineSearchSets(results, num_of_db);
330 /* CThreadedRpsBlast */
331 CLocalRPSBlast::CLocalRPSBlast(CRef<CBlastQueryVector> query_vector,
333 CRef<CBlastOptionsHandle> options,
334 unsigned int num_of_threads):
335 m_num_of_threads(num_of_threads),
337 m_opt_handle(options),
338 m_query_vector(query_vector),
341 CSeqDB::FindVolumePaths(db, CSeqDB::eProtein, m_rps_databases, NULL, true, true);
342 m_num_of_dbs = static_cast<unsigned int>(m_rps_databases.size());
343 if( 1 == m_num_of_dbs)
345 m_num_of_threads = kDisableThreadedSearch;
349 void CLocalRPSBlast::x_AdjustDbSize(void)
351 if(m_opt_handle->GetOptions().GetEffectiveSearchSpace()!= 0)
354 if(m_opt_handle->GetOptions().GetDbLength()!= 0)
357 CSeqDB db(m_db_name, CSeqDB::eProtein);
359 Uint8 db_size = db.GetTotalLengthStats();
360 int num_seq = db.GetNumSeqsStats();
363 db_size = db.GetTotalLength();
366 num_seq = db.GetNumSeqs();
368 m_opt_handle->SetOptions().SetDbLength(db_size);
369 m_opt_handle->SetOptions().SetDbSeqNum(num_seq);
374 CRef<CSearchResultSet> CLocalRPSBlast::Run(void)
376 if(1 != m_num_of_dbs)
381 if(kDisableThreadedSearch == m_num_of_threads)
383 if(1 == m_num_of_dbs)
385 return s_RunLocalRpsSearch(m_db_name, *m_query_vector, m_opt_handle);
389 s_ModifyVolumePaths(m_rps_databases);
391 vector<CRef<CSearchResultSet> > results;
392 for(unsigned int i=0; i < m_num_of_dbs; i++)
394 results.push_back(s_RunLocalRpsSearch(m_rps_databases[i],
399 return s_CombineSearchSets(results, m_num_of_dbs);
405 return RunThreadedSearch();
409 CRef<CSearchResultSet> CLocalRPSBlast::RunThreadedSearch(void)
412 s_ModifyVolumePaths(m_rps_databases);
414 if((kAutoThreadedSearch == m_num_of_threads) ||
415 (m_num_of_threads > m_rps_databases.size()))
417 //Default num of thread : a thread for each db
418 m_num_of_threads = static_cast<unsigned int>(m_rps_databases.size());
420 else if(m_num_of_threads < m_rps_databases.size())
422 // Combine databases, modified the size of rps_database
423 s_MapDbToThread(m_rps_databases, m_num_of_threads);
426 vector<CRef<CSearchResultSet> * > thread_results(m_num_of_threads, NULL);
427 vector <CRPSThread* > thread(m_num_of_threads, NULL);
428 vector<CRef<CSearchResultSet> > results;
430 for(unsigned int t=0; t < m_num_of_threads; t++)
432 // CThread destructor is protected, all threads destory themselves when terminated
433 thread[t] = (new CRPSThread(m_query_vector, m_rps_databases[t], m_opt_handle->SetOptions().Clone()));
437 for(unsigned int t=0; t < m_num_of_threads; t++)
439 thread[t]->Join(reinterpret_cast<void**> (&thread_results[t]));
442 for(unsigned int t=0; t < m_num_of_threads; t++)
444 results.push_back(*(thread_results[t]));
447 CRef<CBlastRPSInfo> rpsInfo = CSetupFactory::CreateRpsStructures(m_db_name,
448 CRef<CBlastOptions> (&(m_opt_handle->SetOptions())));
449 return s_CombineSearchSets(results, m_num_of_threads);
Declares the CBlastRPSOptionsHandle class.
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
#define BEGIN_SCOPE(ns)
Define a new scope.
Main class to perform a BLAST search on the local machine.
Defines the CNcbiApplication and CAppException classes for creating NCBI applications.
Defines NCBI C++ exception handling.
Defines classes: CDirEntry, CFile, CDir, CSymLink, CMemoryFile, CFileUtil, CFileLock,...
Multi-threading – mutexes; rw-locks; semaphore.
Multi-threading – classes, functions, and features.
Defines: CTimeFormat - storage class for time format.
NOTE: This file contains work in progress and the APIs are likely to change, please do not rely on th...
static const char delimiter[]
Declares the CLocalRPSBlast class.
Defines BLAST database access classes.