1 /* $Id: columnar_vcf_reader.cpp 47464 2023-04-20 00:19:10Z evgeniev $
2  * ===========================================================================
3  *
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Authors: Andrea Asztalos, Anatoliy Kuznetsov
27  *
28  * File Description:
29  *
30  *
31  */
33 #include <ncbi_pch.hpp>
36 #include <util/line_reader.hpp>
39 #include <util/bitset/bmdbg.h>
46 #include <chrono>
47 #include <numeric>
48 #include <future>
55 // SVcfFieldData
56 SVcfFieldData::SVcfFieldData(const string& line)
57 {
58  edit::CParseTextOptions options;
59  options.SetStartText("ID=");
60  options.SetStopText(",");
61  m_Name = options.GetSelectedText(line);
63  options.SetStartText(",Number=");
64  options.SetStopText(",");
65  m_Number = options.GetSelectedText(line);
67  options.SetStartText(",Description=\"");
68  options.SetStopText("\"");
69  m_Description = options.GetSelectedText(line);
70 }
72 // CColumnarVCFReader
75 {
76  CStreamLineReader reader(in);
77  unsigned nr_lines = 0;
79  x_ResetInfo();
81  do {
82  if (nr_lines % 200 == 0 && (canceled && canceled->IsCanceled())) {
83  x_ResetInfo();
84  return false;
85  }
87  ++reader;
88  CTempString line = *reader;
89  if (line.empty()) {
90  continue;
91  }
93  if (!NStr::StartsWith(line, "#"))
94  break;
96  if (NStr::StartsWith(line, "##fileformat")) {
97  // it is required to be present:
98  m_VCFversion = line.substr(CTempString("##fileformat=").length());
99  }
100  else if (NStr::StartsWith(line, "##reference")) {
101  // the reference genome used to generate the VCF file
102  m_ReferenceLine = line.substr(CTempString("##reference=").length());
103  if (!m_ReferenceLine.empty()) {
104  if (m_ReferenceLine.find(":/") == NPOS) {
106  }
107  else {
108  edit::CParseTextOptions options;
109  options.SetStartText("accession=");
110  options.SetStopText(",");
111  m_Assembly = options.GetSelectedText(line);
112  if (m_Assembly.empty()) {
113  options.SetStopText(">");
114  m_Assembly = options.GetSelectedText(line);
115  }
116  }
117  }
118  }
119  else if (NStr::StartsWith(line, "##INFO")) {
120  CConstRef<SVcfFieldData> field_data(new SVcfFieldData(line));
121  auto found_it = find_if(m_InfoFields.begin(), m_InfoFields.end(),
122  [&field_data](const CConstRef<SVcfFieldData>& data) { return data->m_Name == field_data->m_Name; });
123  if (found_it == m_InfoFields.end()) {
124  m_InfoFields.insert(field_data);
125  }
126  }
127  else if (NStr::StartsWith(line, "#CHROM")) {
128  x_GatherSampleColNames(line, listener, static_cast<unsigned int>(reader.GetLineNumber()));
129  }
130  } while (reader.PeekChar() == '#');
132  if (m_VCFversion.empty()) {
136  1,
137  "Line starting with ##fileformat is missing",
139  x_ProcessCriticalError(*err, listener);
140  }
141  return true;
142 }
144 void CColumnarVCFReader::x_GatherSampleColNames(const string& header_line, ILineErrorListener* listener, unsigned line_nr)
145 {
146  if (!NStr::StartsWith(header_line, "#CHROM")) {
147  return;
148  }
151  if (header_line.find(" ") != NPOS || header_line.find("\t") == NPOS) {
155  line_nr,
156  "Header line expected to be tab delimited",
158  x_ProcessError(*err, listener);
159  }
160  vector<string> col_names;
161  NStr::Split(header_line, "\t", col_names);
163  bool is_unique = false;
164  {
165  set<string> unique_strs(col_names.begin(), col_names.end());
166  is_unique = (col_names.size() == unique_strs.size());
167  }
168  // Duplicate Sample IDS are not allowed (
169  if (!is_unique) {
173  line_nr,
174  "Column names are not unique",
176  x_ProcessError(*err, listener);
177  }
179  auto it = col_names.begin();
180  for (; it != col_names.end(); ++it) {
181  if (*it == CVCFVariantsBase::sm_FORMAT) {
182  ++it;
183  break;
184  }
185  }
187  unsigned index = 0;
188  for (; it != col_names.end(); ++it, ++index) {
189  m_SampleCols.emplace(index, *it);
190  }
191 }
193 // From ReaderBase
195 {
196  if (!error_cont) {
197  LOG_POST(Error << err.GetLine() << ": " << err.SeverityStr() << err.Message());
198  return;
199  }
200  if (!error_cont->PutError(err)) {
201  err.Throw();
202  }
203 }
205 // From ReaderBase
207 {
208  if (!error_cont) {
209  err.Throw();
210  }
211  if (!error_cont->PutError(err)) {
215  0,
216  "Error allowance exceeded",
218  pErr->Throw();
219  }
220 }
223 {
224  if (!error_cont) {
225  err.Throw();
226  }
227  bool placed = error_cont->PutError(err);
228  if (placed) {
229  err.Throw();
230  } else {
234  0,
235  "Error allowance exceeded",
237  pErr->Throw();
238  }
239 }
242 {
243  auto start = chrono::steady_clock::now();
244  m_ChromosomeMap.clear();
246  unsigned nr_lines = 0;
247  unsigned lines_per_contig = 0;
249  CVCFVariantList *vars_list{ nullptr };
250  string previous_chrom;
252  vector<future<void>> async_calls;
253  auto JoinOptimization = [&async_calls]()
254  {
255  // Wait for the optimization to complete
256  for (auto& task : async_calls) {
257  task.get();
258  }
259  };
261  auto OptimizeVariantsList = [on_variants_list_ready](CRef<CVCFVariantList> &var_map) {
262  auto chr = var_map->GetChrName();
263  auto task_start = chrono::steady_clock::now();
264  {
265  auto opt_start = chrono::steady_clock::now();
266  var_map->FinalizeReading();
267  auto diff_opt = chrono::steady_clock::now() - opt_start;
268  LOG_POST(Info << "Optimization of " << chr << " took " << chrono::duration_cast<chrono::milliseconds>(diff_opt).count() << " ms");
269  }
270  if (on_variants_list_ready) {
271  on_variants_list_ready(*var_map);
272  var_map.Reset();
273  }
274  auto diff_opt = chrono::steady_clock::now() - task_start;
275  //LOG_POST(Info << "Processing of " << chr << " took " << chrono::duration_cast<chrono::milliseconds>(diff_opt).count() << " ms");
276  };
278  string line;
279  while (in.good() && !in.eof()) {
280  if (nr_lines % 1000 == 0 && (canceled && canceled->IsCanceled())) {
282  return false;
283  }
285  NcbiGetlineEOL(in, line);
286  nr_lines++;
288  if (NStr::StartsWith(line, "#CHROM")) {
289  x_ProcessHeaderLine(line, nr_lines, listener);
290  continue;
291  }
293  // ignore empty and comment lines
294  if (line.empty() || (!line.empty() && line[0] == '#')) {
295  continue;
296  }
298  if (line.find("\t") == NPOS) {
302  nr_lines,
303  "Has been skipped as it is not tab delimited:\n" + line,
305  x_ProcessWarning(*err, listener);
306  continue;
307  }
309  size_t pos = line.find("\t");
310  string chrom = line.substr(0, pos);
311  if (chrom != previous_chrom) {
312  if (!previous_chrom.empty()) {
313  if (lines_per_contig > kAsyncVarsThreshold)
314  async_calls.push_back(async(std::launch::async | std::launch::deferred, OptimizeVariantsList, std::ref(;
315  else
316  OptimizeVariantsList(;
317  }
318  previous_chrom = chrom;
319  lines_per_contig = 0;
321  if (m_ChromosomeMap.find(chrom) == m_ChromosomeMap.end()) {
322  auto inserted = m_ChromosomeMap.emplace(chrom,
324  vars_list = inserted.first->second.GetPointer();
325  }
326  else {
330  nr_lines,
331  chrom + " data line found out of its block. All entries for a specific CHROM should form a contiguous block within the VCF file.",
333  x_ProcessError(*err, listener);
334  continue;
335  }
336  }
338  if (prog_func && lines_per_contig > 0 && lines_per_contig % 500000 == 0) {
339  string progress = "Parsed " + NStr::UInt8ToString(lines_per_contig) + " lines";
340  prog_func(progress);
341  }
343  try {
344  _ASSERT(vars_list);
345  vars_list->ParseLine(line);
346  lines_per_contig++;
347  }
348  catch (const CException& e) {
352  nr_lines,
353  e.GetMsg(),
355  x_ProcessWarning(*err, listener);
356  }
357  }
359  auto diff_parsing = chrono::steady_clock::now() - start;
360  LOG_POST(Info << "Parsed " << nr_lines << " lines from VCF file in "
361  << chrono::duration_cast<chrono::milliseconds>(diff_parsing).count() << " ms ");
363  if (canceled && canceled->IsCanceled()) {
365  // Wait for the optimization to complete
366  JoinOptimization();
367  return false;
368  }
370  if (!in.eof() && !in.good()) {
371  LOG_POST(Error << "Reading cannot be completed, as input stream is corrupted");
374  // Wait for the optimization to complete
375  JoinOptimization();
376  NCBI_THROW(CIO_Exception, eUnknown, "Failed to read all variants from file");
377  }
379  if (!previous_chrom.empty() && {
380  OptimizeVariantsList(;
381  }
382  // Wait for the optimization to complete
383  JoinOptimization();
384  if (on_variants_list_ready) {
385  // Chromosomes were processed in the callback and the list can be released
386  m_ChromosomeMap.clear();
387  }
389  return true;
390 }
392 unsigned CColumnarVCFReader::x_ProcessHeaderLine(const string& header_line, unsigned line_nr, ILineErrorListener* listener)
393 {
394  _ASSERT(NStr::StartsWith(header_line, "#CHROM"));
396  if (header_line.find(" ") != NPOS || header_line.find("\t") == NPOS) {
400  line_nr,
401  "Header line is expected to be tab delimited",
403  x_ProcessCriticalError(*err, listener);
404  }
406  // the number of mandatory columns of a VCF file
407  const unsigned kMandatoryCols = 8;
408  unsigned nr_tabs = static_cast<unsigned>(count(header_line.begin(), header_line.end(), '\t'));
409  if (nr_tabs + 1 < kMandatoryCols) {
413  line_nr,
414  "Header line is expected to have at least 8 columns",
416  x_ProcessCriticalError(*err, listener);
417  }
419  x_GetSamplesToLoad(header_line, listener, line_nr);
420  return nr_tabs;
421 }
424 {
425  m_ChromosomeMap.clear();
426 }
429 {
430  m_VCFversion.resize(0);
431  m_ReferenceLine.resize(0);
432  m_Assembly.resize(0);
435 }
437 void CColumnarVCFReader::x_GetSamplesToLoad(const string& header_line, ILineErrorListener* listener, unsigned line_nr)
438 {
440  if (m_LoadAllSamples) {
441  if (m_LoadSamples.empty()) {
442  x_GatherSampleColNames(header_line, listener, line_nr);
443  }
444  else {
446  }
447  }
448  else {
449  if (m_LoadSamples.empty()) {
451  }
452  else {
454  }
455  }
456 }
458 vector<CColumnarVCFReader::TSeqIdVarsListPair>
460  const vector<pair<CConstRef<CSeq_id>, vector<string>>>& chr_list,
461  ICanceled* canceled,
462  ILineErrorListener* listener,
463  TReportProgress prog_func,
464  TOnVCFVariantListReady on_variants_list_ready)
465 {
466  vector<pair<CConstRef<CSeq_id>, CRef<CVCFVariantList>>> variants_list;
468  auto start = chrono::steady_clock::now();
470  unsigned nr_lines = 0;
471  unsigned lines_per_contig = 0;
473  CRef<CVCFVariantList> vcf_vars;
474  size_t search_chrs = chr_list.size();
475  string prev_chrom; // chromosome from the previous line
477  vector<future<void>> async_calls;
478  auto JoinOptimization = [&async_calls]()
479  {
480  // Wait for the optimization to complete
481  for (auto& task : async_calls) {
482  task.get();
483  }
484  };
486  auto OptimizeVariantsList = [on_variants_list_ready](CRef<CVCFVariantList> var_map) {
487  auto chr = var_map->GetChrName();
488  auto task_start = chrono::steady_clock::now();
489  {
490  auto opt_start = chrono::steady_clock::now();
491  var_map->FinalizeReading();
492  auto diff_opt = chrono::steady_clock::now() - opt_start;
493  LOG_POST(Info << "Optimization of " << chr << " took " << chrono::duration_cast<chrono::milliseconds>(diff_opt).count() << " ms");
494  }
495  if (on_variants_list_ready) {
496  on_variants_list_ready(*var_map);
497  var_map.Reset();
498  }
499  auto diff_opt = chrono::steady_clock::now() - task_start;
500  //LOG_POST(Info << "Processing of " << chr << " took " << chrono::duration_cast<chrono::milliseconds>(diff_opt).count() << " ms");
501  };
503  auto CallOptimizeVarsList = [&]() {
504  if (lines_per_contig > kAsyncVarsThreshold)
505  async_calls.push_back(async(std::launch::async | std::launch::deferred, OptimizeVariantsList, vcf_vars));
506  else
507  OptimizeVariantsList(vcf_vars);
508  };
510  string line;
511  while (in.good() && !in.eof() && search_chrs > 0) {
512  if (nr_lines % 1000 == 0 && (canceled && canceled->IsCanceled())) {
513  // Wait for the optimization to complete
514  JoinOptimization();
515  variants_list.clear();
516  return variants_list;
517  }
519  NcbiGetlineEOL(in, line);
520  nr_lines++;
522  if (NStr::StartsWith(line, "#CHROM")) {
523  x_ProcessHeaderLine(line, nr_lines, listener);
524  continue;
525  }
527  // ignore empty and comment lines
528  if (line.empty() || (!line.empty() && line[0] == '#')) {
529  continue;
530  }
532  if (line.find("\t") == NPOS) {
536  nr_lines,
537  "Has been skipped as it is not tab delimited:\n" + line,
539  x_ProcessWarning(*err, listener);
540  continue;
541  }
543  size_t pos = line.find("\t");
544  string chrom = line.substr(0, pos);
545  if (!vcf_vars || (vcf_vars && !NStr::EqualCase(vcf_vars->GetChrName(), chrom))) {
546  if (prev_chrom == chrom)
547  continue;
549  bool found = false;
550  for (const auto& syn_it : chr_list) {
551  const auto& seq_id = syn_it.first;
552  const auto& synonyms = syn_it.second;
553  if (find_if(synonyms.begin(), synonyms.end(),
554  [&chrom](const string& elem) { return NStr::EqualCase(chrom, elem); }) != synonyms.end()) {
556  found = true;
557  if (vcf_vars) {
558  CallOptimizeVarsList();
559  lines_per_contig = 0;
560  vcf_vars.Release();
561  }
563  if (find_if(variants_list.begin(), variants_list.end(),
564  [&seq_id](const TSeqIdVarsListPair& elem) { return (seq_id->AsFastaString() == elem.first->AsFastaString()); }) == variants_list.end()) {
565  variants_list.emplace_back(seq_id, new CVCFVariantList(chrom, m_LoadAllInfo, m_LoadInfoFields, m_SampleCols));
566  vcf_vars = variants_list.back().second;
567  }
568  else {
572  nr_lines,
573  chrom + " data line found out of its block. All entries for a specific CHROM should form a contiguous block within the VCF file.",
575  x_ProcessError(*err, listener);
576  }
578  break;
579  }
580  }
581  if (!found) {
582  if (vcf_vars) {
583  CallOptimizeVarsList();
584  vcf_vars.Release();
585  vcf_vars.Reset(nullptr);
586  search_chrs--;
587  }
588  prev_chrom = chrom;
589  lines_per_contig = 0;
590  }
591  }
593  if (vcf_vars) {
594  if (prog_func && lines_per_contig > 0 && lines_per_contig % 500000 == 0) {
595  string progress = "Parsed " + NStr::UInt8ToString(lines_per_contig) + " lines";
596  prog_func(progress);
597  }
598  try {
599  vcf_vars->ParseLine(line);
600  lines_per_contig++;
601  }
602  catch (const CException& e) {
606  nr_lines,
607  e.GetMsg(),
609  x_ProcessWarning(*err, listener);
610  }
611  if ((m_VariationsLimit > 0) && (lines_per_contig > m_VariationsLimit)) {
616  "The file exceeds the limit of " + NStr::UIntToString(m_VariationsLimit) + " variations per chromosome.",
618  x_ProcessCriticalError(*err, listener);
619  break;
620  }
621  }
622  }
624  auto diff_parsing = chrono::steady_clock::now() - start;
625  LOG_POST(Info << "Parsed " << nr_lines << " lines in "
626  << chrono::duration_cast<chrono::milliseconds>(diff_parsing).count() << " ms ");
628  if (canceled && canceled->IsCanceled()) {
629  // Wait for the optimization to complete
630  JoinOptimization();
631  variants_list.clear();
632  return variants_list;
633  }
635  if (!in.good() && !in.eof()) {
636  LOG_POST(Error << "Reading cannot be completed, as input stream is corrupted");
638  // Wait for the optimization to complete
639  JoinOptimization();
640  variants_list.clear();
641  NCBI_THROW(CIO_Exception, eUnknown, "Failed to read all variants from file");
642  }
644  if (vcf_vars) {
645  OptimizeVariantsList(vcf_vars);
646  }
647  // Wait for the optimization to complete
648  JoinOptimization();
650  if (chr_list.size() != variants_list.size()) {
651  for (const auto& chr_it : chr_list) {
652  if (find_if(variants_list.begin(), variants_list.end(),
653  [&chr_it](const pair<CConstRef<CSeq_id>, CRef<CVCFVariantList>>& elem)
654  { return elem.first->Equals(*chr_it.first); }) == variants_list.end()) {
656  auto id_str = chr_it.first->AsFastaString();
660  nr_lines,
661  "Chromosome " + id_str + " is not in the file",
663  x_ProcessWarning(*err, listener);
664  }
665  }
666  }
668  if (!on_variants_list_ready) {
669  for (auto& var_it : variants_list) {
670  m_ChromosomeMap.emplace(var_it.second->GetChrName(), var_it.second);
671  }
672  }
674  return variants_list;
675 }
678 {
679  vector<string> names;
680  for (const auto& it : m_ChromosomeMap) {
681  names.push_back(it.first);
682  }
683  return names;
684 }
687 {
688  auto it = m_ChromosomeMap.find(chr_name);
689  if (it != m_ChromosomeMap.end()) {
690  return it->second;
691  }
692  return CRef<CVCFVariantList>();
693 }
695 // CColumnarVCFReaderTest
698 {
699  for (auto& it : m_ChromosomeMap) {
700  it.second->GetStatistics(out);
701  }
702 }
705 {
706  for (auto&& it : m_ChromosomeMap) {
707  it.second->SerializeVariantData(prefix, out);
708  }
709 }
712 {
713  for (auto&& it : m_ChromosomeMap) {
714  it.second->DeserializeAndCheck(prefix, out);
715  }
716 }
719 {
720  for (const auto& it : m_ChromosomeMap) {
721  it.second->List(out, only_sv_cols);
722  }
723 }
726 {
727  for (auto& it : m_ChromosomeMap) {
728  it.second->ListPositionVectors(out);
729  }
730 }
