The Pedigree Project  0.1
Buffer.cc
1 /*
2  * Copyright (c) 2008-2014, Pedigree Developers
3  *
4  * Please see the CONTRIB file in the root of the source tree for a full
5  * list of contributors.
6  *
7  * Permission to use, copy, modify, and distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  */
19 
20 #include "pedigree/kernel/utilities/Buffer.h"
21 #include "pedigree/kernel/LockGuard.h"
22 #include "pedigree/kernel/utilities/utility.h"
23 
24 #ifdef THREADS
25 #include "pedigree/kernel/process/Thread.h"
26 #endif
27 
28 template <class T, bool allowShortOperation>
30  : m_BufferSize(bufferSize), m_DataSize(0), m_Lock(false),
31  m_WriteCondition(), m_ReadCondition(), m_Segments(), m_MonitorTargets(),
32  m_bCanRead(true), m_bCanWrite(true)
33 {
34 }
35 
36 template <class T, bool allowShortOperation>
38 {
39  // Wake up all readers and writers to finish up existing operations.
40  disableReads();
41  disableWrites();
42 
43  // Clean up the entire buffer.
44  wipe();
45 
46  // Clean up monitor targets.
47  m_Lock.acquire();
48  for (auto pTarget : m_MonitorTargets)
49  {
50 #ifdef THREADS
51  if (pTarget->pSemaphore)
52  {
53  pTarget->pSemaphore->release();
54  }
55 #endif
56  delete pTarget;
57  }
58  m_MonitorTargets.clear();
59  m_Lock.release();
60 }
61 
62 template <class T, bool allowShortOperation>
63 size_t
64 Buffer<T, allowShortOperation>::write(const T *buffer, size_t count, bool block)
65 {
66  if (!block)
67  {
68  if (!m_Lock.tryAcquire())
69  {
70  // can't unlock buffer for writing
71  return 0;
72  }
73  }
74  else
75  {
76  // can block!
77  m_Lock.acquire();
78  }
79 
80  size_t countSoFar = 0;
81  while (true)
82  {
83  // Can we write?
84  if (!m_bCanWrite)
85  {
86  // No! Maybe not anymore, so return what we've written so far.
87  break;
88  }
89 
90  // Do we have space?
91  size_t bytesAvailable = m_BufferSize - m_DataSize;
92  if (!bytesAvailable)
93  {
94  if (!block)
95  {
96  // Cannot block!
97  break;
98  }
99 
100  // Can any reader get us out of this situation?
101  if (!m_bCanRead)
102  {
103  // No. Return what we've written so far.
104  break;
105  }
106 
107  // No, we need to wait.
109  m_WriteCondition.wait(m_Lock);
110  if (result.hasError())
111  {
112  return countSoFar;
113  }
114  continue;
115  }
116 
117  // Yes, we have room.
118  size_t totalCount = bytesAvailable;
119  if (totalCount > count)
120  {
121  totalCount = count;
122  }
123 
124  // If we're allowed to just give up if we don't have enough bytes, do
125  // so (e.g. for TCP buffers and the like).
126  if (allowShortOperation && (totalCount > bytesAvailable))
127  {
128  totalCount = bytesAvailable;
129  count = bytesAvailable;
130  if (!totalCount)
131  {
132  break;
133  }
134  }
135 
136  // Add the segment.
137  bool copiedData = false;
138  while (totalCount)
139  {
140  size_t numberCopied = 0;
141  if (totalCount >= m_SegmentSize)
142  {
143  // Ignore any existing segments, just create a new one.
144  addSegment(buffer, m_SegmentSize);
145  numberCopied = m_SegmentSize;
146  }
147  else if (!m_Segments.count())
148  {
149  // Just add this segment.
150  addSegment(buffer, totalCount);
151  numberCopied = totalCount;
152  }
153  else
154  {
155  // Can we modify the most recent segment?
156  Segment *pSegment = m_Segments.popBack();
157  if (pSegment->size == m_SegmentSize)
158  {
159  m_Segments.pushBack(pSegment);
160 
161  // Full already, need a new segment.
162  addSegment(buffer, totalCount);
163  numberCopied = totalCount;
164  }
165  else
166  {
167  // There's room in this segment.
168  T *start = &pSegment->data[pSegment->size];
169  size_t availableSpace = m_SegmentSize - pSegment->size;
170  if (availableSpace > totalCount)
171  {
172  availableSpace = totalCount;
173  }
174  pedigree_std::copy(start, buffer, availableSpace);
175 
176  // We just added more bytes to this segment.
177  pSegment->size += availableSpace;
178 
179  // Done with this segment, so push it back for reading.
180  m_Segments.pushBack(pSegment);
181 
182  // Was that enough?
183  if (availableSpace < totalCount)
184  {
185  // No, need one more segment.
186  addSegment(
187  &buffer[availableSpace],
188  totalCount - availableSpace);
189  }
190 
191  // Done.
192  numberCopied = totalCount;
193  }
194  }
195 
196  countSoFar += numberCopied;
197  m_DataSize += numberCopied;
198  buffer += numberCopied;
199  totalCount -= numberCopied;
200  count -= numberCopied;
201 
202  if (!copiedData)
203  {
204  copiedData = numberCopied > 0;
205  }
206  }
207 
208  // Wake up a reader that was waiting for data.
209  // We do this here rather than after we finish as we may need to block
210  // again (e.g. if we're writing more than the size of the buffer), and
211  // a reader is needed to unblock that.
212  if (copiedData)
213  {
214  m_ReadCondition.signal();
215  }
216 
217  if (!count)
218  {
219  // Complete.
220  break;
221  }
222  }
223 
224  m_Lock.release();
225 
226  if (countSoFar)
227  {
228  // We've updated the buffer, so send events.
229  notifyMonitors();
230  }
231 
232  return countSoFar;
233 }
234 
235 template <class T, bool allowShortOperation>
236 size_t Buffer<T, allowShortOperation>::read(T *buffer, size_t count, bool block)
237 {
238  if (!block)
239  {
240  if (!m_Lock.tryAcquire())
241  {
242  // can't unlock buffer for writing
243  return 0;
244  }
245  }
246  else
247  {
248  // can block!
249  m_Lock.acquire();
250  }
251 
252  size_t countSoFar = 0;
253  while (true)
254  {
255  // Can we read?
256  if (!m_bCanRead)
257  {
258  // No! Maybe not anymore, so return what we've read so far.
259  break;
260  }
261 
262  // Do we have anything to read?
263  if (!m_DataSize)
264  {
265  if (!block)
266  {
267  // Cannot block!
268  break;
269  }
270 
271  // Can any writer get us out of this situation?
272  if (!m_bCanWrite)
273  {
274  // No. Return what we've read so far.
275  break;
276  }
277 
278  // No, we need to wait.
279  ConditionVariable::WaitResult result = m_ReadCondition.wait(m_Lock);
280  if (result.hasError())
281  {
282  return countSoFar;
283  }
284  continue;
285  }
286 
287  // Yes, we have room.
288  size_t totalCount = count;
289  if (totalCount > m_DataSize)
290  {
291  totalCount = m_DataSize;
292  }
293 
294  size_t numberCopied = 0;
295  while (m_Segments.count() && numberCopied < totalCount)
296  {
297  // Grab the first segment and read it.
298  Segment *pSegment = m_Segments.popFront();
299  size_t countToRead = pSegment->size - pSegment->reader;
300  if ((numberCopied + countToRead) > totalCount)
301  {
302  countToRead = totalCount - numberCopied;
303  }
304 
305  // Copy.
306  pedigree_std::copy(
307  buffer, &pSegment->data[pSegment->reader], countToRead);
308  pSegment->reader += countToRead;
309 
310  // Do we need to re-add it?
311  if (pSegment->reader < pSegment->size)
312  {
313  m_Segments.pushFront(pSegment);
314  }
315  else
316  {
317  delete pSegment;
318  }
319 
320  numberCopied += countToRead;
321  buffer += countToRead;
322  }
323 
324  m_DataSize -= numberCopied;
325  countSoFar += numberCopied;
326  count -= numberCopied;
327 
328  // We read some bytes so writers may be able to continue, which may be
329  // needed to unblock us if we loop back around and block.
330  if (numberCopied)
331  {
332  // Wake up a writer that was waiting for space to write.
333  m_WriteCondition.signal();
334  }
335 
336  if (!count)
337  {
338  break;
339  }
340 
341  // Once we've read at least some bytes, don't block - just return what
342  // we've read so far if we loop back around and have no data.
343  block = false;
344  }
345 
346  m_Lock.release();
347 
348  if (countSoFar)
349  {
350  // We've updated the buffer, so send events.
351  notifyMonitors();
352  }
353 
354  return countSoFar;
355 }
356 
357 template <class T, bool allowShortOperation>
359 {
360  LockGuard<Mutex> guard(m_Lock);
361  m_bCanWrite = false;
362 
363  // All pending readers need to now return.
364  m_ReadCondition.broadcast();
365 }
366 
367 template <class T, bool allowShortOperation>
369 {
370  LockGuard<Mutex> guard(m_Lock);
371  m_bCanRead = false;
372 
373  // All pending writers need to now return.
374  m_WriteCondition.broadcast();
375 }
376 
377 template <class T, bool allowShortOperation>
379 {
380  LockGuard<Mutex> guard(m_Lock);
381  bool previous = m_bCanWrite;
382  m_bCanWrite = true;
383  return previous;
384 }
385 
386 template <class T, bool allowShortOperation>
388 {
389  LockGuard<Mutex> guard(m_Lock);
390  bool previous = m_bCanRead;
391  m_bCanRead = true;
392  return previous;
393 }
394 
395 template <class T, bool allowShortOperation>
397 {
398  LockGuard<Mutex> guard(m_Lock);
399  return m_DataSize;
400 }
401 
402 template <class T, bool allowShortOperation>
404 {
405  return m_BufferSize;
406 }
407 
408 template <class T, bool allowShortOperation>
410 {
411  if (!block)
412  {
413  return m_bCanWrite && (m_DataSize < m_BufferSize);
414  }
415 
416  LockGuard<Mutex> guard(m_Lock);
417 
418  if (!m_bCanWrite)
419  {
420  return false;
421  }
422 
423  // We can get woken here if we stop being able to write.
424  while (m_bCanWrite && m_DataSize >= m_BufferSize)
425  {
426  ConditionVariable::WaitResult result = m_WriteCondition.wait(m_Lock);
427  if (result.hasError())
428  {
429  return false;
430  }
431  }
432 
433  return m_bCanWrite;
434 }
435 
436 template <class T, bool allowShortOperation>
438 {
439  if (!block)
440  {
441  return m_bCanRead && (m_DataSize > 0);
442  }
443 
444  LockGuard<Mutex> guard(m_Lock);
445 
446  if (!m_bCanRead)
447  {
448  return false;
449  }
450 
451  // We can get woken here if we stop being able to read.
452  while (m_bCanRead && !m_DataSize)
453  {
454  ConditionVariable::WaitResult result = m_ReadCondition.wait(m_Lock);
455  if (result.hasError())
456  {
458  return false;
459  }
460  }
461 
462  return m_bCanRead;
463 }
464 
465 template <class T, bool allowShortOperation>
467 {
468  LockGuard<Mutex> guard(m_Lock);
469 
470  // Wipe out every segment we own.
471  for (auto pSegment : m_Segments)
472  {
473  delete pSegment;
474  }
475  m_Segments.clear();
476  m_DataSize = 0;
477 
478  // Notify writers that might have been waiting for space.
479  m_WriteCondition.signal();
480 }
481 
482 template <class T, bool allowShortOperation>
484 {
485 #ifdef THREADS
486  LockGuard<Mutex> guard(m_Lock);
487  MonitorTarget *pTarget = new MonitorTarget(pThread, pEvent);
488  m_MonitorTargets.pushBack(pTarget);
489 #endif
490 }
491 
492 template <class T, bool allowShortOperation>
494 {
495 #ifdef THREADS
496  LockGuard<Mutex> guard(m_Lock);
497  MonitorTarget *pTarget = new MonitorTarget(pSemaphore);
498  m_MonitorTargets.pushBack(pTarget);
499 #endif
500 }
501 
502 template <class T, bool allowShortOperation>
504 {
505 #ifdef THREADS
506  LockGuard<Mutex> guard(m_Lock);
507  for (auto it = m_MonitorTargets.begin(); it != m_MonitorTargets.end(); ++it)
508  {
509  MonitorTarget *pMT = *it;
510 
511  if (pMT->pThread == pThread)
512  {
513  delete pMT;
514  m_MonitorTargets.erase(it);
515  it = m_MonitorTargets.begin();
516  if (it == m_MonitorTargets.end())
517  return;
518  }
519  }
520 #endif
521 }
522 
523 template <class T, bool allowShortOperation>
525 {
526 #ifdef THREADS
527  LockGuard<Mutex> guard(m_Lock);
528  for (auto it = m_MonitorTargets.begin(); it != m_MonitorTargets.end();)
529  {
530  MonitorTarget *pMT = *it;
531 
532  if (pMT->pSemaphore == pSemaphore)
533  {
534  delete pMT;
535  it = m_MonitorTargets.erase(it);
536  }
537  else
538  {
539  ++it;
540  }
541  }
542 #endif
543 }
544 
545 template <class T, bool allowShortOperation>
547 {
548 #ifdef THREADS
549  LockGuard<Mutex> guard(m_Lock);
550  for (auto it = m_MonitorTargets.begin(); it != m_MonitorTargets.end();)
551  {
552  MonitorTarget *pMT = *it;
553 
554  if (pMT->pEvent == pEvent)
555  {
556  delete pMT;
557  it = m_MonitorTargets.erase(it);
558  }
559  else
560  {
561  ++it;
562  }
563  }
564 #endif
565 }
566 
567 template <class T, bool allowShortOperation>
569 {
570 #ifdef THREADS
571  LockGuard<Mutex> guard(m_Lock);
572  for (typename List<MonitorTarget *>::Iterator it = m_MonitorTargets.begin();
573  it != m_MonitorTargets.end(); it++)
574  {
575  MonitorTarget *pMT = *it;
576 
577  if (pMT->pThread)
578  {
579  pMT->pThread->sendEvent(pMT->pEvent);
580  }
581  else if (pMT->pSemaphore)
582  {
583  pMT->pSemaphore->release();
584  }
585  delete pMT;
586  }
587  m_MonitorTargets.clear();
588 #endif
589 }
590 
591 template <class T, bool allowShortOperation>
592 void Buffer<T, allowShortOperation>::addSegment(const T *buffer, size_t count)
593 {
594  // Called with lock taken.
595  Segment *pNewSegment = new Segment();
596  pedigree_std::copy(pNewSegment->data, buffer, count);
597  pNewSegment->size = count;
598  m_Segments.pushBack(pNewSegment);
599 }
600 
601 template class Buffer<uint8_t, false>;
602 template class Buffer<uint8_t, true>;
603 template class Buffer<char, false>;
604 template class Buffer<char, true>;
605 template class Buffer<size_t, false>;
606 template class Buffer<size_t, true>;
void wipe()
Definition: Buffer.cc:466
bool canRead(bool block)
Definition: Buffer.cc:437
T data[m_SegmentSize]
Definition: Buffer.h:179
void addSegment(const T *buffer, size_t count)
Definition: Buffer.cc:592
void cullMonitorTargets(Thread *pThread)
Definition: Buffer.cc:503
void monitor(Thread *pThread, Event *pEvent)
Definition: Buffer.cc:483
bool canWrite(bool block)
Definition: Buffer.cc:409
size_t getDataSize()
Definition: Buffer.cc:396
size_t read(T *buffer, size_t count, bool block=true)
Definition: Buffer.cc:236
Definition: Result.h:36
bool enableWrites()
Definition: Buffer.cc:378
void notifyMonitors()
Definition: Buffer.cc:568
Definition: Buffer.h:49
bool sendEvent(Event *pEvent)
Definition: Thread.cc:529
void disableReads()
Definition: Buffer.cc:368
void release(size_t n=1)
Definition: Semaphore.cc:239
::Iterator< T, node_t > Iterator
Definition: List.h:71
Definition: Thread.h:54
Definition: Event.h:48
size_t size
Definition: Buffer.h:185
size_t write(const T *buffer, size_t count, bool block=true)
Definition: Buffer.cc:64
size_t getSize()
Definition: Buffer.cc:403
void disableWrites()
Definition: Buffer.cc:358
size_t reader
Definition: Buffer.h:182
bool enableReads()
Definition: Buffer.cc:387