The Pedigree Project  0.1
RequestQueue.cc
1 /*
2  * Copyright (c) 2008-2014, Pedigree Developers
3  *
4  * Please see the CONTRIB file in the root of the source tree for a full
5  * list of contributors.
6  *
7  * Permission to use, copy, modify, and distribute this software for any
8  * purpose with or without fee is hereby granted, provided that the above
9  * copyright notice and this permission notice appear in all copies.
10  *
11  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
12  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
13  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
14  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
15  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
16  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
17  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18  */
19 
20 #include "pedigree/kernel/utilities/RequestQueue.h"
21 #include "pedigree/kernel/LockGuard.h"
22 #include "pedigree/kernel/Log.h"
23 #include "pedigree/kernel/machine/Machine.h"
24 #include "pedigree/kernel/machine/Timer.h"
25 #include "pedigree/kernel/process/Scheduler.h"
26 #include "pedigree/kernel/process/Thread.h"
27 #include "pedigree/kernel/processor/Processor.h"
28 #include "pedigree/kernel/processor/ProcessorInformation.h"
29 #include "pedigree/kernel/time/Time.h"
30 #include "pedigree/kernel/utilities/assert.h"
31 #include "pedigree/kernel/utilities/new"
32 
33 class Process;
34 
36  : m_Stop(false),
37 #ifdef THREADS
38  m_RequestQueueMutex(false), m_pThread(0), m_Halted(false),
39 #endif
40  m_nMaxAsyncRequests(256), m_nAsyncRequests(0), m_Name(name)
41 {
42  for (size_t i = 0; i < REQUEST_QUEUE_NUM_PRIORITIES; i++)
43  m_pRequestQueue[i] = 0;
44 
45 #ifdef THREADS
46  m_OverrunChecker.queue = this;
47 #endif
48 }
49 
50 RequestQueue::~RequestQueue()
51 {
52  destroy();
53 }
54 
56 {
57 // Start the worker thread.
58 #ifdef THREADS
59  if (m_pThread)
60  {
61  WARNING("RequestQueue initialised multiple times - don't do this.");
62  return;
63  }
64 
65  // Start RequestQueue workers in the kernel process only.
66  Process *pProcess = Scheduler::instance().getKernelProcess();
67 
68  m_Stop = false;
69  m_pThread =
70  new Thread(pProcess, &trampoline, reinterpret_cast<void *>(this));
71  m_Halted = false;
72 
73  // Add our timer so we can figure out if we're not keeping up with
74  // synchronous requests
75  Timer *t = Machine::instance().getTimer();
76  if (t)
77  {
78  t->registerHandler(&m_OverrunChecker);
79  }
80 #else
81  WARNING("RequestQueue: This build does not support threads");
82 #endif
83 }
84 
86 {
87 #ifdef THREADS
88  // Halt the queue - we're done.
89  halt();
90 
91  // Clean up the queue in full.
93  for (size_t i = 0; i < REQUEST_QUEUE_NUM_PRIORITIES; ++i)
94  {
95  Request *pRequest = m_pRequestQueue[i];
96 
97  // No more requests at this priority, we're cleaning up.
98  m_pRequestQueue[i] = 0;
99 
100  while (pRequest)
101  {
102  // Cancel the request, let the owner clean up.
103  pRequest->bReject = true;
104  pRequest->mutex.release();
105  pRequest = pRequest->next;
106  }
107  }
109 
110  Timer *t = Machine::instance().getTimer();
111  if (t)
112  {
113  t->unregisterHandler(&m_OverrunChecker);
114  }
115 #endif
116 }
117 
119  size_t priority, uint64_t p1, uint64_t p2, uint64_t p3, uint64_t p4,
120  uint64_t p5, uint64_t p6, uint64_t p7, uint64_t p8)
121 {
122  return addRequest(
123  priority, RequestQueue::Block, p1, p2, p3, p4, p5, p6, p7, p8);
124 }
125 
127  size_t priority, ActionOnDuplicate action, uint64_t p1, uint64_t p2,
128  uint64_t p3, uint64_t p4, uint64_t p5, uint64_t p6, uint64_t p7,
129  uint64_t p8)
130 {
131 #ifdef THREADS
132  // Create a new request object.
133  Request *pReq = new Request();
134  pReq->p1 = p1;
135  pReq->p2 = p2;
136  pReq->p3 = p3;
137  pReq->p4 = p4;
138  pReq->p5 = p5;
139  pReq->p6 = p6;
140  pReq->p7 = p7;
141  pReq->p8 = p8;
142  pReq->next = 0;
143  pReq->bReject = false;
144  pReq->refcnt = 1;
145  pReq->owner = this;
146  pReq->priority = priority;
147 
148  // Do we own pReq?
149  bool bOwnRequest = true;
150 
151  // Add to the request queue.
153 
154  if (m_pRequestQueue[priority] == 0)
155  m_pRequestQueue[priority] = pReq;
156  else
157  {
158  Request *p = m_pRequestQueue[priority];
159  while (p->next != 0)
160  {
161  // Wait for duplicates instead of re-inserting, if the compare
162  // function is defined.
163  if (compareRequests(*p, *pReq) && action != NewRequest)
164  {
165  bOwnRequest = false;
166  delete pReq;
167  pReq = p;
168  break;
169  }
170  p = p->next;
171  }
172 
173  if (bOwnRequest && compareRequests(*p, *pReq) && action != NewRequest)
174  {
175  bOwnRequest = false;
176  delete pReq;
177  pReq = p;
178  }
179  else if (bOwnRequest)
180  p->next = pReq;
181  }
182 
183  if (!bOwnRequest)
184  {
185  if (action == ReturnImmediately)
186  {
188  return 0;
189  }
190  ++pReq->refcnt;
191  }
192  else
193  {
194  pReq->pThread = Processor::information().getCurrentThread();
195  pReq->pThread->addRequest(pReq);
196  }
197 
198  ++m_nTotalRequests;
199 
200  // One more item now available.
203 
204  // We are waiting on the worker thread - mark the thread as such.
205  Thread *pThread = Processor::information().getCurrentThread();
206  pThread->setBlockingThread(m_pThread);
207 
208  if (pReq->bReject)
209  {
210  // Hmm, in the time the RequestQueueMutex was being acquired, we got
211  // pre-empted, and then an unexpected exit event happened. The request
212  // is to be rejected, so don't acquire the mutex at all.
213  if (!--pReq->refcnt)
214  delete pReq;
215  return 0;
216  }
217 
218  // Wait for the request to be satisfied. This should sleep the thread.
219  pReq->mutex.acquire();
220 
222  --m_nTotalRequests;
224 
225  // Don't use the Thread object if it may be already freed
226  if (!pReq->bReject)
227  pThread->setBlockingThread(0);
228 
229  if (pReq->bReject || pThread->wasInterrupted() ||
230  pThread->getUnwindState() == Thread::Exit)
231  {
232  // The request was interrupted somehow. We cannot assume that pReq's
233  // contents are valid, so just return zero. The caller may have to redo
234  // their request.
235  // By releasing here, the worker thread can detect that the request was
236  // interrupted and clean up by itself.
237  NOTICE("RequestQueue::addRequest - interrupted");
238  if (pReq->bReject && !--pReq->refcnt)
239  delete pReq; // Safe to delete, unexpected exit condition
240  else
241  pReq->mutex.release();
242  return 0;
243  }
244 
245  // Grab the result.
246  uintptr_t ret = pReq->ret;
247 
248  // Delete the request structure.
249  if (bOwnRequest && pReq->pThread)
250  pReq->pThread->removeRequest(pReq);
251  if (!--pReq->refcnt)
252  delete pReq;
253  else
254  pReq->mutex.release();
255 
256  return ret;
257 #else
258  return executeRequest(p1, p2, p3, p4, p5, p6, p7, p8);
259 #endif
260 }
261 
263 {
264  RequestQueue::Request *pReq = reinterpret_cast<RequestQueue::Request *>(p);
265 
266  ++(pReq->owner->m_nAsyncRequests);
267 
268  // Just return if the request is a duplicate, as our caller doesn't care and
269  // we're otherwise just using up another thread stack and burning time for
270  // no real reason.
271  uint64_t result = pReq->owner->addRequest(
272  pReq->priority, ReturnImmediately, pReq->p1, pReq->p2, pReq->p3,
273  pReq->p4, pReq->p5, pReq->p6, pReq->p7, pReq->p8);
274 
275  --(pReq->owner->m_nAsyncRequests);
276 
277  delete pReq;
278 
279  return 0;
280 }
281 
283  size_t priority, uint64_t p1, uint64_t p2, uint64_t p3, uint64_t p4,
284  uint64_t p5, uint64_t p6, uint64_t p7, uint64_t p8)
285 {
286 #ifndef THREADS
287  return addRequest(priority, p1, p2, p3, p4, p5, p6, p7, p8);
288 #else
289  // Create a new request object.
290  Request *pReq = new Request();
291  pReq->p1 = p1;
292  pReq->p2 = p2;
293  pReq->p3 = p3;
294  pReq->p4 = p4;
295  pReq->p5 = p5;
296  pReq->p6 = p6;
297  pReq->p7 = p7;
298  pReq->p8 = p8;
299  pReq->next = 0;
300  pReq->bReject = false;
301  pReq->refcnt = 0;
302  pReq->owner = this;
303  pReq->priority = priority;
304 
305  // We cannot block, so we just have to drop the request if the queue is
306  // already overloaded with async requests.
307  if (m_nAsyncRequests >= m_nMaxAsyncRequests)
308  {
309  ERROR(
310  "RequestQueue: '" << m_Name
311  << "' is not keeping up with demand for "
312  "async requests");
313  ERROR(
314  " -> priority=" << priority << ", p1=" << Hex << p1 << ", p2=" << p2
315  << ", p3=" << p3 << ", p4=" << p4);
316  ERROR(
317  " -> p5=" << Hex << p5 << ", p6=" << p6 << ", p7=" << p7
318  << ", p8=" << p8);
319  delete pReq;
320  return 0;
321  }
322 
323  // Add to RequestQueue.
324  Process *pProcess = Scheduler::instance().getKernelProcess();
325  Thread *pThread =
326  new Thread(pProcess, &doAsync, reinterpret_cast<void *>(pReq));
327  pThread->detach();
328 #endif
329 
330  return 0;
331 }
332 
334 {
335 #ifdef THREADS
337  if (!m_Halted)
338  {
339  m_Stop = true;
341 
342  // Join now - we need to release the mutex so the worker thread can keep
343  // going, as it could be blocked on trying to acquire it right now.
345  m_pThread->join();
347 
348  m_pThread = 0;
349  m_Halted = true;
350  }
352 #endif
353 }
354 
356 {
357 #ifdef THREADS
359 
360  if (m_Halted)
361  {
362  initialise();
363  }
364 #endif
365 }
366 
368 {
369  RequestQueue *pRQ = reinterpret_cast<RequestQueue *>(p);
370  return pRQ->work();
371 }
372 
374 {
375 #ifdef THREADS
376  // Must have the lock to be here.
378 #endif
379 
380  // Get the most important queue with data in.
382  size_t priority = 0;
383  bool bFound = false;
384  for (priority = 0; priority < REQUEST_QUEUE_NUM_PRIORITIES - 1; priority++)
385  {
386  if (m_pRequestQueue[priority])
387  {
388  bFound = true;
389  break;
390  }
391  }
392 
393  if (!bFound)
394  {
395  return 0;
396  }
397 
398  Request *pReq = m_pRequestQueue[priority];
399  if (pReq != 0)
400  {
401  m_pRequestQueue[priority] = pReq->next;
402  }
403 
404  return pReq;
405 }
406 
408 {
409 #ifdef THREADS
410  // Hold from the start - this will be released by the condition variable
411  // wait for us, and re-acquired on return, so we'll always have the lock
412  // until we explicitly release it.
414  while (true)
415  {
416  // Do we need to stop?
417  if (m_Stop)
418  {
420  return 0;
421  }
422 
423  Request *pReq = getNextRequest();
424  if (!pReq)
425  {
426  // Need to wait for another request.
429  continue;
430  }
431 
432  // We have a request! We don't need to use the queue anymore.
434 
435  // Verify that it's still valid to run the request
436  if (!pReq->bReject)
437  {
438  // Perform the request.
439  bool finished = true;
440  pReq->ret = executeRequest(
441  pReq->p1, pReq->p2, pReq->p3, pReq->p4, pReq->p5, pReq->p6,
442  pReq->p7, pReq->p8);
443  if (pReq->mutex.tryAcquire())
444  {
445  // Something's gone wrong - the calling thread has released the
446  // Mutex. Destroy the request and grab the next request from the
447  // queue. The calling thread has long since stopped caring about
448  // whether we're done or not.
449  NOTICE("RequestQueue::work - caller interrupted");
450  if (pReq->pThread)
451  pReq->pThread->removeRequest(pReq);
452  finished = false;
453 
455  continue;
456  }
457  switch (
458  Processor::information().getCurrentThread()->getUnwindState())
459  {
460  case Thread::Continue:
461  break;
462  case Thread::Exit:
463  WARNING("RequestQueue: unwind state is Exit, request not "
464  "cleaned up. Leak?");
465  return 0;
467  Processor::information().getCurrentThread()->setUnwindState(
469  break;
470  }
471 
472  // Request finished - post the request's mutex to wake the calling
473  // thread.
474  if (finished)
475  {
476  pReq->bCompleted = true;
477  pReq->mutex.release();
478  }
479  }
480 
481  // Acquire mutex ready to re-check condition.
482  // We do this here as the head of the loop must have the lock (to allow
483  // the condition variable to work with our lock correctly).
485  }
486 #else
487  return 0;
488 #endif
489 }
490 
491 #ifdef THREADS
493  uint64_t delta, InterruptState &)
494 {
495  m_Tick += delta;
496  if (delta < Time::Multiplier::Second)
497  {
498  return;
499  }
500 
501  m_Tick -= Time::Multiplier::Second;
502 
503  queue->m_RequestQueueMutex.acquire();
504  size_t lastSize = m_LastQueueSize;
505  size_t currentSize = queue->m_nTotalRequests;
506  m_LastQueueSize = currentSize;
507  queue->m_RequestQueueMutex.release();
508 
509  if (lastSize < currentSize)
510  {
511  FATAL(
512  "RequestQueue '"
513  << queue->m_Name
514  << "' is NOT keeping up with incoming requests [1s ago we had "
515  << lastSize << " requests, now have " << currentSize << "]!");
516  }
517 }
518 #endif
519 
521 {
522 #ifdef THREADS
523  // Halted RequestQueue already has the RequestQueue mutex held.
525 #endif
526 
527  for (size_t priority = 0; priority < REQUEST_QUEUE_NUM_PRIORITIES - 1;
528  ++priority)
529  {
530  Request *pReq = m_pRequestQueue[priority];
531  while (pReq)
532  {
533  if (pReq == r)
534  return true;
535 
536  pReq = pReq->next;
537  }
538  }
539 
540  return false;
541 }
virtual void timer(uint64_t delta, InterruptState &state)
volatile bool m_Stop
Definition: RequestQueue.h:193
bool wasInterrupted()
Definition: Thread.h:229
virtual bool compareRequests(const Request &a, const Request &b)
Definition: RequestQueue.h:166
bool acquire(size_t n=1, size_t timeoutSecs=0, size_t timeoutUsecs=0)
Definition: Semaphore.h:62
uint64_t addAsyncRequest(size_t priority, uint64_t p1=0, uint64_t p2=0, uint64_t p3=0, uint64_t p4=0, uint64_t p5=0, uint64_t p6=0, uint64_t p7=0, uint64_t p8=0)
virtual Timer * getTimer()=0
bool isRequestValid(const Request *r)
Definition: String.h:49
static ProcessorInformation & information()
Definition: Processor.cc:45
virtual void destroy()
Definition: RequestQueue.cc:85
Request * getNextRequest()
Mutex m_RequestQueueMutex
Definition: RequestQueue.h:197
#define WARNING(text)
Definition: Log.h:78
ssize_t getValue()
Definition: Semaphore.cc:318
virtual uint64_t executeRequest(uint64_t p1, uint64_t p2, uint64_t p3, uint64_t p4, uint64_t p5, uint64_t p6, uint64_t p7, uint64_t p8)=0
void release(size_t n=1)
Definition: Semaphore.cc:239
#define NOTICE(text)
Definition: Log.h:74
Definition: Log.h:136
#define assert(x)
Definition: assert.h:37
static Scheduler & instance()
Definition: Scheduler.h:48
static int trampoline(void *p)
void addRequest(RequestQueue::Request *req)
Definition: Thread.cc:730
ConditionVariable m_RequestQueueCondition
Definition: RequestQueue.h:200
void removeRequest(RequestQueue::Request *req)
Definition: Thread.cc:738
No unwind necessary, carry on as normal.
Definition: Thread.h:243
Definition: Thread.h:54
#define ERROR(text)
Definition: Log.h:82
virtual void initialise()
Definition: RequestQueue.cc:55
MUST_USE_RESULT uint64_t addRequest(size_t priority, uint64_t p1=0, uint64_t p2=0, uint64_t p3=0, uint64_t p4=0, uint64_t p5=0, uint64_t p6=0, uint64_t p7=0, uint64_t p8=0)
Request * m_pRequestQueue[REQUEST_QUEUE_NUM_PRIORITIES]
Definition: RequestQueue.h:190
static int doAsync(void *p)
bool detach()
Definition: Thread.cc:885
(b) below.
Definition: Thread.h:245
#define FATAL(text)
Definition: Log.h:89
bool join()
Definition: Thread.cc:836
bool tryAcquire(size_t n=1)
Definition: Semaphore.cc:223
UnwindType getUnwindState()
Definition: Thread.h:261
RequestQueue(const String &name)
Definition: RequestQueue.cc:35
WaitResult wait(Mutex &mutex, Time::Timestamp &timeout)