NCBI C++ ToolKit
volume_merge.cpp
Go to the documentation of this file.

Go to the SVN repository for this file.

1 /* $Id: volume_merge.cpp 36029 2007-11-13 02:00:31Z dicuccio $
2  * ===========================================================================
3  *
4  * PUBLIC DOMAIN NOTICE
5  * National Center for Biotechnology Information
6  *
7  * This software/database is a "United States Government Work" under the
8  * terms of the United States Copyright Act. It was written as part of
9  * the author's official duties as a United States Government employee and
10  * thus cannot be copyrighted. This software/database is freely available
11  * to the public for use. The National Library of Medicine and the U.S.
12  * Government have not placed any restriction on its use or reproduction.
13  *
14  * Although all reasonable efforts have been taken to ensure the accuracy
15  * and reliability of the software and data, the NLM and the U.S.
16  * Government do not and cannot warrant the performance or results that
17  * may be obtained by using this software or data. The NLM and the U.S.
18  * Government disclaim all warranties, express or implied, including
19  * warranties of performance, merchantability or fitness for any particular
20  * purpose.
21  *
22  * Please cite the author in any work or product based on this material.
23  *
24  * ===========================================================================
25  *
26  * Author: Anatoliy Kuznetsov, Mike DiCuccio
27  *
28  * File Description: Volume Merge
29  *
30  */
31 
32 #include <ncbi_pch.hpp>
33 #include <corelib/ncbi_limits.hpp>
35 
37 
38 
39 
41 : m_Merger(0),
42  m_OwnVolumeVect(eTakeOwnership),
43  m_Store(0),
44  m_MergeKey(0),
45  m_MinKey(0)
46 {
47 }
48 
50 {
52  for (size_t i = 0; i < m_VolumeVect.size(); ++i) {
53  delete m_VolumeVect[i];
54  }
55  }
56 }
57 
59  EOwnership own)
60 {
61  m_Merger.reset(merger, own);
62  if (merger) {
64  }
65 }
66 
67 void CMergeVolumes::SetVolumes(const vector<IMergeVolumeWalker*>& vol_vector,
68  EOwnership own)
69 {
71  for (size_t i = 0; i < m_VolumeVect.size(); ++i) {
72  delete m_VolumeVect[i];
73  }
74  }
75  m_VolumeVect = vol_vector;
76  m_OwnVolumeVect = own;
77 }
78 
79 
81  EOwnership own)
82 {
83  m_Store.reset(store, own);
84  if (store) {
86  }
87 }
88 
89 /// @internal
90 static
92 {
93  return
94  (iasync == 0) ? IAsyncInterface::eReady : iasync->GetStatus();
95 }
96 
97 
99 {
100  _ASSERT(m_Store.get());
101  _ASSERT(m_Merger.get());
102  _ASSERT(m_VolumeVect.size());
103 
104  // initiate volume traverse
105  NON_CONST_ITERATE(vector<IMergeVolumeWalker*>, it, m_VolumeVect) {
106  (*it)->FetchFirst();
107  }
108 
109  bool merger_empty = true;
110  // volume scan exclude flag vector
111  vector<unsigned> vol_exclude(m_VolumeVect.size());
112  // volume end flag vector
113  vector<unsigned> vol_eof(m_VolumeVect.size());
114  for (size_t i = 0; i < vol_exclude.size(); ++i) {
115  vol_eof[i] = 0;
116  }
117 
118 
119  // main merge loop
120  size_t pending_volumes = m_VolumeVect.size();
121  size_t processed_volumes = 0;
122  while (pending_volumes != processed_volumes)
123  {
124  x_ResetMinKey();
125  // initiate exclusion flags to avoid volume re-evaluation
126  // while spinning (exclude all eof volumes)
127  vol_exclude = vol_eof;
128  bool complete_scan;
129 
130  // spin around volumes while not ready
131  do {
132  complete_scan = true;
133  for (size_t i = 0; i < m_VolumeVect.size(); ++i) {
134  // check if the volume has already been evaluated
135  if (vol_exclude[i]) continue;
136 
137  IMergeVolumeWalker* volume = m_VolumeVect[i];
138  _ASSERT(volume);
139  if (!volume->IsGood()) {
140  NCBI_THROW(CMerge_Exception, eInputVolumeFailure,
141  "Input volume failed. Volume merge cannot recover.");
142  continue;
143  }
144  if (volume->IsEof()) {
145  ++processed_volumes;
146  vol_exclude[i] = vol_eof[i] = 1;
147  continue;
148  }
149 
150  IAsyncInterface::EStatus astatus =
151  s_GetAsyncStatus(volume->QueryIAsync());
152  switch (astatus)
153  {
155  {
156  Uint4 new_key = volume->GetUint4Key();
157  if (!merger_empty && (new_key == m_MergeKey)) {
158  x_MergeVolume(volume);
159  // merge volume called Fetch, so
160  // new record is pending for evaluation
161  complete_scan = false;
162  vol_exclude[i] = 0;
163  } else {
164  x_EvaluateMinKey(new_key, i);
165  vol_exclude[i] = 1;
166  }
167  }
168  break;
170  NCBI_THROW(CMerge_Exception, eInputVolumeFailure,
171  "Input volume failed. Volume merge cannot recover.");
172  //++processed_volumes;
173  //vol_exclude[i] = 1;
174  break;
176  ++processed_volumes;
177  vol_exclude[i] = vol_eof[i] = 1;
178  continue;
180  complete_scan = false; // something is pending
181  break;
182  default:
183  _ASSERT(0);
184  } // switch
185  } // for each volume
186  } while (!complete_scan);
187 
188  if (!merger_empty) {
189  // since we are out of the volume scan it means the merger
190  // has accumulated all current min keys and no more is coming
191  // we can store the BLOB now
192  x_StoreMerger();
193  merger_empty = true;
194  }
195 
196  // merge next min key value BLOBs
197  if (!m_MinKeyCandidates.empty()) {
199  m_MinKeyCandidates.resize(0);
200  merger_empty = false;
201  }
202 
203  } // while pending volumes
204 
205  if (!merger_empty) {
206  x_StoreMerger();
207  }
208 
209  // closing input-output interfaces
210  m_Store->Close();
211  for (size_t i = 0; i < m_VolumeVect.size(); ++i) {
212  m_VolumeVect[i]->Close();
213  }
214 }
215 
217 {
218  IAsyncInterface* iasync = m_Store->QueryIAsync();
219  while (iasync) {
220  IAsyncInterface::EStatus astatus = iasync->WaitReady();
221  switch (astatus)
222  {
224  iasync = 0;
225  break;
228  NCBI_THROW(CMerge_Exception, eStoreFailure,
229  "Store device failed. Volume merge cannot recover.");
230  break;
232  break;
233  default:
234  _ASSERT(0);
235  } // switch
236  } // while
237 
238 
239  // check if store already has the same BLOB (if yes add to the merger)
240  //
241  TRawBuffer* store_blob_buffer = m_Store->ReadBlob(m_MergeKey);
242  if (store_blob_buffer) {
243  m_Merger->Merge(store_blob_buffer);
244  }
245 
246  // merge
247  //
248  TRawBuffer* blob_buffer = m_Merger->GetMergeBuffer();
249  if (blob_buffer) {
250  TBufPoolGuard guard(m_BufResourcePool, blob_buffer);
251 
252  m_Merger->Reset();
253 
254  if (m_Store->IsGood()) {
255  // Place async. store request
256  m_Store->Store(m_MergeKey, guard.Release());
257  } else {
258  NCBI_THROW(CMerge_Exception, eStoreFailure,
259  "Store device failed. Volume merge cannot recover.");
260  }
261  }
262  m_MergeKey = 0;
263 }
264 
266 {
267  size_t buf_size;
268  const unsigned char* buf = volume->GetBufferPtr(&buf_size);
269  _ASSERT(buf);
270  _ASSERT(buf_size);
271 
272  // construct detachable buffer
273  TRawBuffer* blob_buffer = m_BufResourcePool.Get();
274  TBufPoolGuard guard(m_BufResourcePool, blob_buffer);
275  blob_buffer->resize(buf_size);
276  ::memcpy(&((*blob_buffer)[0]), buf, buf_size);
277 
278  // inform the volume that record has been moved to a new location
279  volume->SetRecordMoved();
280 
281  // place async fetch request
282  volume->Fetch();
283 
284  // pass the raw buffer to the merger
285  m_Merger->Merge(guard.Release());
286 }
287 
289 {
290  ITERATE(vector<size_t>, it, m_MinKeyCandidates) {
291  size_t vol_idx = *it;
292  IMergeVolumeWalker* volume = m_VolumeVect[vol_idx];
293  x_MergeVolume(volume);
295  }
296 }
297 
298 void CMergeVolumes::x_EvaluateMinKey(unsigned new_key,
299  size_t volume_idx)
300 {
301  if (new_key < m_MinKey) {
302  m_MinKey = new_key;
303  m_MinKeyCandidates.resize(1);
304  m_MinKeyCandidates[0] = volume_idx;
305  return;
306  }
307  if (new_key == m_MinKey) { // new merge candidate
308  m_MinKeyCandidates.push_back(volume_idx);
309  }
310 }
312 {
314  m_MinKeyCandidates.resize(0);
315 }
316 
void SetMergeStore(IMergeStore *store, EOwnership own=eTakeOwnership)
Set merge store (destination)
void x_MergeVolume(IMergeVolumeWalker *volume)
void SetVolumes(const vector< IMergeVolumeWalker * > &vol_vector, EOwnership own=eTakeOwnership)
Set merge volumes.
AutoPtr< IMergeBlob > m_Merger
TBufResourcePool m_BufResourcePool
void x_StoreMerger()
void Run()
Execute merge process (main merge loop)
vector< size_t > m_MinKeyCandidates
min-key volumes
void x_MergeCandidates()
Merge all discovered min key candidates.
EOwnership m_OwnVolumeVect
AutoPtr< IMergeStore > m_Store
unsigned m_MergeKey
key in the merger
void x_EvaluateMinKey(unsigned new_key, size_t volume_idx)
check if volume key is new minimum or equals to old minimum
void SetMergeAccumulator(IMergeBlob *merger, EOwnership own=eTakeOwnership)
Set merge accumulator component.
vector< IMergeVolumeWalker * > m_VolumeVect
void x_ResetMinKey()
Reset min. evaluation.
unsigned m_MinKey
min key value
Base Merge algorithms exception class.
Reallocable memory buffer (no memory copy overhead) Mimics vector<>, without the overhead of explicit...
void resize(size_type new_size)
BLOB merge interface, merges one or more BLOBs together.
void SetResourcePool(CMergeVolumes::TBufResourcePool &res_pool)
Set resource pool for BLOB buffer memory management.
virtual void Reset()=0
Reset merging, forget all the accumulated buffers.
virtual CMergeVolumes::TRawBuffer * GetMergeBuffer()=0
Returns destination (merged) buffer Caller MUST return the buffer to the buffer pool.
virtual void Merge(CMergeVolumes::TRawBuffer *buffer)=0
Merge request Implementation MUST return the buffer to the pool.
void reset(element_type *p=0, EOwnership ownership=eTakeOwnership)
Reset will delete the old pointer (if owned), set content to the new value, and assume the ownership ...
Definition: ncbimisc.hpp:480
#define ITERATE(Type, Var, Cont)
ITERATE macro to sequence through container elements.
Definition: ncbimisc.hpp:815
element_type * get(void) const
Get pointer.
Definition: ncbimisc.hpp:469
#define NON_CONST_ITERATE(Type, Var, Cont)
Non constant version of ITERATE macro.
Definition: ncbimisc.hpp:822
@ eTakeOwnership
An object can take ownership of another.
Definition: ncbi_types.h:136
#define NCBI_THROW(exception_class, err_code, message)
Generic macro to throw an exception, given the exception class, error code and message string.
Definition: ncbiexpt.hpp:704
uint32_t Uint4
4-byte (32-bit) unsigned integer
Definition: ncbitype.h:103
#define kMax_UInt
Definition: ncbi_limits.h:185
Value * Get()
Get object from the pool.
Pool::TValue * Release()
Return the pointer to the caller, not to the pool.
#define END_NCBI_SCOPE
End previously defined NCBI scope.
Definition: ncbistl.hpp:103
#define BEGIN_NCBI_SCOPE
Define ncbi namespace.
Definition: ncbistl.hpp:100
enum ENcbiOwnership EOwnership
Ownership relations between objects.
char * buf
int i
Interface defines async.
virtual EStatus GetStatus() const =0
Get current interface async. status.
@ eNoMoreData
Interface reached the end (EOF)
@ eFailed
Last operation failed and interface cannot recover.
@ eReady
Volume is ready.
@ eNotReady
Last request did not finish yet.
virtual EStatus WaitReady() const =0
Wait until interface is ready (or operation fails) (On failure volume is free to throw an exception)
Interface to store merged BLOBs.
virtual void Close()=0
Close storage (when it ends) Method is responsible for finalization of store procedure,...
virtual IAsyncInterface * QueryIAsync()=0
Get pointer to async.
virtual CMergeVolumes::TRawBuffer * ReadBlob(Uint4 blob_id)=0
Read buffer with the specified blob_id This method is for store update, when we are merging into an e...
virtual void Store(Uint4 blob_id, CMergeVolumes::TRawBuffer *buffer)=0
Store BLOB request This request can be asyncronous caller needs to check status using IAsyncInterface...
virtual bool IsGood() const =0
Return TRUE if storage device is in good shape.
void SetResourcePool(CMergeVolumes::TBufResourcePool &res_pool)
Set resource pool for BLOB buffer memory management.
Interface to traverse volume for merge.
virtual bool IsEof() const =0
Return TRUE when volume traverse reaches the end.
virtual bool IsGood() const =0
Return TRUE if volume is in good condition (not failed)
virtual Uint4 GetUint4Key() const =0
Get access to the key as unsigned integer (if this type is supported)
virtual void Fetch()=0
Request to get next record This request can be asyncronous caller needs to check status using IAsyncI...
virtual IAsyncInterface * QueryIAsync()=0
Get pointer to async.
virtual const unsigned char * GetBufferPtr(size_t *buf_size) const =0
Get low level access to the merge BLOB buffer and buffer size (next Fetch call invalidates this point...
virtual void SetRecordMoved()=0
Signals that current record moved to merged storage (volume manager may decide to delete it later) Vo...
#define _ASSERT
static IAsyncInterface::EStatus s_GetAsyncStatus(IAsyncInterface *iasync)
Modified on Fri Sep 20 14:57:50 2024 by modify_doxy.py rev. 669887