RaiseAsync.c 37.1 KB
Newer Older
1 2 3 4 5 6 7 8 9 10
/* ---------------------------------------------------------------------------
 *
 * (c) The GHC Team, 1998-2006
 *
 * Asynchronous exceptions
 *
 * --------------------------------------------------------------------------*/

#include "PosixSource.h"
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
11 12

#include "sm/Storage.h"
13 14 15
#include "Threads.h"
#include "Trace.h"
#include "RaiseAsync.h"
16
#include "Schedule.h"
17 18
#include "Updates.h"
#include "STM.h"
Simon Marlow's avatar
Simon Marlow committed
19
#include "sm/Sanity.h"
20
#include "Profiling.h"
21
#include "Messages.h"
22 23 24
#if defined(mingw32_HOST_OS)
#include "win32/IOManager.h"
#endif
25

26 27 28
static void blockedThrowTo (Capability *cap,
                            StgTSO *target, MessageThrowTo *msg);

29 30
static void removeFromQueues(Capability *cap, StgTSO *tso);

31 32
static void removeFromMVarBlockedQueue (StgTSO *tso);

33
static void throwToSendMsg (Capability *cap USED_IF_THREADS,
34
                            Capability *target_cap USED_IF_THREADS,
35 36
                            MessageThrowTo *msg USED_IF_THREADS);

37 38 39 40 41
/* -----------------------------------------------------------------------------
   throwToSingleThreaded

   This version of throwTo is safe to use if and only if one of the
   following holds:
42

43 44 45 46 47 48 49 50 51 52 53
     - !THREADED_RTS

     - all the other threads in the system are stopped (eg. during GC).

     - we surely own the target TSO (eg. we just took it from the
       run queue of the current capability, or we are running it).

   It doesn't cater for blocking the source thread until the exception
   has been raised.
   -------------------------------------------------------------------------- */

Simon Marlow's avatar
Simon Marlow committed
54
static void
55
throwToSingleThreaded__ (Capability *cap, StgTSO *tso, StgClosure *exception,
Ben Gamari's avatar
Ben Gamari committed
56
                         bool stop_at_atomically, StgUpdateFrame *stop_here)
57 58 59
{
    // Thread already dead?
    if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
60
        return;
61 62 63 64 65
    }

    // Remove it from any blocking queues
    removeFromQueues(cap,tso);

Simon Marlow's avatar
Simon Marlow committed
66
    raiseAsync(cap, tso, exception, stop_at_atomically, stop_here);
67 68 69
}

void
Simon Marlow's avatar
Simon Marlow committed
70
throwToSingleThreaded (Capability *cap, StgTSO *tso, StgClosure *exception)
71
{
Ben Gamari's avatar
Ben Gamari committed
72
    throwToSingleThreaded__(cap, tso, exception, false, NULL);
Simon Marlow's avatar
Simon Marlow committed
73
}
74

Simon Marlow's avatar
Simon Marlow committed
75
void
76
throwToSingleThreaded_ (Capability *cap, StgTSO *tso, StgClosure *exception,
Ben Gamari's avatar
Ben Gamari committed
77
                        bool stop_at_atomically)
Simon Marlow's avatar
Simon Marlow committed
78 79 80
{
    throwToSingleThreaded__ (cap, tso, exception, stop_at_atomically, NULL);
}
81

82
void // cannot return a different TSO
Simon Marlow's avatar
Simon Marlow committed
83 84
suspendComputation (Capability *cap, StgTSO *tso, StgUpdateFrame *stop_here)
{
Ben Gamari's avatar
Ben Gamari committed
85
    throwToSingleThreaded__ (cap, tso, NULL, false, stop_here);
86 87
}

88 89 90 91 92 93 94 95
/* -----------------------------------------------------------------------------
   throwToSelf

   Useful for throwing an async exception in a thread from the
   runtime.  It handles unlocking the throwto message returned by
   throwTo().

   Note [Throw to self when masked]
96

97 98 99 100 101 102 103 104 105
   When a StackOverflow occurs when the thread is masked, we want to
   defer the exception to when the thread becomes unmasked/hits an
   interruptible point.  We already have a mechanism for doing this,
   the blocked_exceptions list, but the use here is a bit unusual,
   because an exception is normally only added to this list upon
   an asynchronous 'throwTo' call (with all of the relevant
   multithreaded nonsense). Morally, a stack overflow should be an
   asynchronous exception sent by a thread to itself, and it should
   have the same semantics.  But there are a few key differences:
106

107 108 109 110
   - If you actually tried to send an asynchronous exception to
     yourself using throwTo, the exception would actually immediately
     be delivered.  This is because throwTo itself is considered an
     interruptible point, so the exception is always deliverable. Thus,
111
     ordinarily, we never end up with a message to oneself in the
112
     blocked_exceptions queue.
113

114 115 116 117 118 119
   - In the case of a StackOverflow, we don't actually care about the
     wakeup semantics; when an exception is delivered, the thread that
     originally threw the exception should be woken up, since throwTo
     blocks until the exception is successfully thrown.  Fortunately,
     it is harmless to wakeup a thread that doesn't actually need waking
     up, e.g. ourselves.
120

121 122 123 124
   - No synchronization is necessary, because we own the TSO and the
     capability.  You can observe this by tracing through the execution
     of throwTo.  We skip synchronizing the message and inter-capability
     communication.
125

126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
   We think this doesn't break any invariants, but do be careful!
   -------------------------------------------------------------------------- */

void
throwToSelf (Capability *cap, StgTSO *tso, StgClosure *exception)
{
    MessageThrowTo *m;

    m = throwTo(cap, tso, tso, exception);

    if (m != NULL) {
        // throwTo leaves it locked
        unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
    }
}

142 143 144 145 146 147 148 149 150
/* -----------------------------------------------------------------------------
   throwTo

   This function may be used to throw an exception from one thread to
   another, during the course of normal execution.  This is a tricky
   task: the target thread might be running on another CPU, or it
   may be blocked and could be woken up at any point by another CPU.
   We have some delicate synchronisation to do.

151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171
   The underlying scheme when multiple Capabilities are in use is
   message passing: when the target of a throwTo is on another
   Capability, we send a message (a MessageThrowTo closure) to that
   Capability.

   If the throwTo needs to block because the target TSO is masking
   exceptions (the TSO_BLOCKEX flag), then the message is placed on
   the blocked_exceptions queue attached to the target TSO.  When the
   target TSO enters the unmasked state again, it must check the
   queue.  The blocked_exceptions queue is not locked; only the
   Capability owning the TSO may modify it.

   To make things simpler for throwTo, we always create the message
   first before deciding what to do.  The message may get sent, or it
   may get attached to a TSO's blocked_exceptions queue, or the
   exception may get thrown immediately and the message dropped,
   depending on the current state of the target.

   Currently we send a message if the target belongs to another
   Capability, and it is

172
     - NotBlocked, BlockedOnMsgThrowTo,
173
       BlockedOnCCall_Interruptible
174 175 176 177 178 179 180 181

     - or it is masking exceptions (TSO_BLOCKEX)

   Currently, if the target is BlockedOnMVar, BlockedOnSTM, or
   BlockedOnBlackHole then we acquire ownership of the TSO by locking
   its parent container (e.g. the MVar) and then raise the exception.
   We might change these cases to be more message-passing-like in the
   future.
182 183

   Returns:
184

185
   NULL               exception was raised, ok to continue
186

187
   MessageThrowTo *   exception was not raised; the source TSO
188
                      should now put itself in the state
189
                      BlockedOnMsgThrowTo, and when it is ready
190
                      it should unlock the message using
191 192 193
                      unlockClosure(msg, &stg_MSG_THROWTO_info);
                      If it decides not to raise the exception after
                      all, it can revoke it safely with
194
                      unlockClosure(msg, &stg_MSG_NULL_info);
195 196 197

   -------------------------------------------------------------------------- */

198
MessageThrowTo *
199 200 201 202
throwTo (Capability *cap,       // the Capability we hold
         StgTSO *source,        // the TSO sending the exception (or NULL)
         StgTSO *target,        // the TSO receiving the exception
         StgClosure *exception) // the exception closure
203 204 205 206
{
    MessageThrowTo *msg;

    msg = (MessageThrowTo *) allocate(cap, sizeofW(MessageThrowTo));
207
    // the message starts locked; see below
208
    SET_HDR(msg, &stg_WHITEHOLE_info, CCS_SYSTEM);
209 210 211 212 213 214 215
    msg->source      = source;
    msg->target      = target;
    msg->exception   = exception;

    switch (throwToMsg(cap, msg))
    {
    case THROWTO_SUCCESS:
216 217 218
        // unlock the message now, otherwise we leave a WHITEHOLE in
        // the heap (#6103)
        SET_HDR(msg, &stg_MSG_THROWTO_info, CCS_SYSTEM);
219
        return NULL;
220

221 222
    case THROWTO_BLOCKED:
    default:
223 224 225
        // the caller will unlock the message when it is ready.  We
        // cannot unlock it yet, because the calling thread will need
        // to tidy up its state first.
226 227 228
        return msg;
    }
}
229

230

231
uint32_t
232
throwToMsg (Capability *cap, MessageThrowTo *msg)
233 234
{
    StgWord status;
235
    StgTSO *target = msg->target;
236
    Capability *target_cap;
237

238 239 240 241 242 243 244
    goto check_target;

retry:
    write_barrier();
    debugTrace(DEBUG_sched, "throwTo: retrying...");

check_target:
Simon Marlow's avatar
Simon Marlow committed
245 246
    ASSERT(target != END_TSO_QUEUE);

247
    // Thread already dead?
248 249 250
    if (target->what_next == ThreadComplete
        || target->what_next == ThreadKilled) {
        return THROWTO_SUCCESS;
251 252
    }

253 254
    debugTraceCap(DEBUG_sched, cap,
                  "throwTo: from thread %lu to thread %lu",
255
                  (unsigned long)msg->source->id,
256
                  (unsigned long)msg->target->id);
257

Ben Gamari's avatar
Ben Gamari committed
258
#if defined(DEBUG)
259
    traceThreadStatus(DEBUG_sched, target);
260 261
#endif

262 263 264 265
    target_cap = target->cap;
    if (target->cap != cap) {
        throwToSendMsg(cap, target_cap, msg);
        return THROWTO_BLOCKED;
266 267 268
    }

    status = target->why_blocked;
269

270 271 272
    switch (status) {
    case NotBlocked:
    {
273 274
        if ((target->flags & TSO_BLOCKEX) == 0) {
            // It's on our run queue and not blocking exceptions
Ben Gamari's avatar
Ben Gamari committed
275
            raiseAsync(cap, target, msg->exception, false, NULL);
276
            return THROWTO_SUCCESS;
277
        } else {
278 279
            blockedThrowTo(cap,target,msg);
            return THROWTO_BLOCKED;
280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314
        }
    }

    case BlockedOnMsgThrowTo:
    {
        const StgInfoTable *i;
        MessageThrowTo *m;

        m = target->block_info.throwto;

        // target is local to this cap, but has sent a throwto
        // message to another cap.
        //
        // The source message is locked.  We need to revoke the
        // target's message so that we can raise the exception, so
        // we attempt to lock it.

        // There's a possibility of a deadlock if two threads are both
        // trying to throwTo each other (or more generally, a cycle of
        // threads).  To break the symmetry we compare the addresses
        // of the MessageThrowTo objects, and the one for which m <
        // msg gets to spin, while the other can only try to lock
        // once, but must then back off and unlock both before trying
        // again.
        if (m < msg) {
            i = lockClosure((StgClosure *)m);
        } else {
            i = tryLockClosure((StgClosure *)m);
            if (i == NULL) {
//            debugBelch("collision\n");
                throwToSendMsg(cap, target->cap, msg);
                return THROWTO_BLOCKED;
            }
        }

315 316 317 318 319
        if (i == &stg_MSG_NULL_info) {
            // we know there's a MSG_TRY_WAKEUP on the way, so we
            // might as well just do it now.  The message will
            // be a no-op when it arrives.
            unlockClosure((StgClosure*)m, i);
320
            tryWakeupThread(cap, target);
321 322 323
            goto retry;
        }

324
        if (i != &stg_MSG_THROWTO_info) {
325
            // if it's a MSG_NULL, this TSO has been woken up by another Cap
326 327 328 329
            unlockClosure((StgClosure*)m, i);
            goto retry;
        }

330 331
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
332 333 334 335 336 337
            unlockClosure((StgClosure*)m, i);
            blockedThrowTo(cap,target,msg);
            return THROWTO_BLOCKED;
        }

        // nobody else can wake up this TSO after we claim the message
338
        doneWithMsgThrowTo(m);
339

Ben Gamari's avatar
Ben Gamari committed
340
        raiseAsync(cap, target, msg->exception, false, NULL);
341
        return THROWTO_SUCCESS;
342 343 344
    }

    case BlockedOnMVar:
345
    case BlockedOnMVarRead:
346
    {
347 348 349 350 351 352 353 354 355 356 357
        /*
          To establish ownership of this TSO, we need to acquire a
          lock on the MVar that it is blocked on.
        */
        StgMVar *mvar;
        StgInfoTable *info USED_IF_THREADS;

        mvar = (StgMVar *)target->block_info.closure;

        // ASSUMPTION: tso->block_info must always point to a
        // closure.  In the threaded RTS it does.
358
        switch (get_itbl((StgClosure *)mvar)->type) {
359 360 361 362 363 364
        case MVAR_CLEAN:
        case MVAR_DIRTY:
            break;
        default:
            goto retry;
        }
365

366
        info = lockClosure((StgClosure *)mvar);
367

368
        // we have the MVar, let's check whether the thread
369 370 371 372 373 374
        // is still blocked on the same MVar.
        if ((target->why_blocked != BlockedOnMVar && target->why_blocked != BlockedOnMVarRead)
            || (StgMVar *)target->block_info.closure != mvar) {
            unlockClosure((StgClosure *)mvar, info);
            goto retry;
        }
375

376 377 378 379 380
        if (target->_link == END_TSO_QUEUE) {
            // the MVar operation has already completed.  There is a
            // MSG_TRY_WAKEUP on the way, but we can just wake up the
            // thread now anyway and ignore the message when it
            // arrives.
381
            unlockClosure((StgClosure *)mvar, info);
382
            tryWakeupThread(cap, target);
383 384 385
            goto retry;
        }

386 387
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
388
            blockedThrowTo(cap,target,msg);
389 390 391
            unlockClosure((StgClosure *)mvar, info);
            return THROWTO_BLOCKED;
        } else {
392 393
            // revoke the MVar operation
            removeFromMVarBlockedQueue(target);
Ben Gamari's avatar
Ben Gamari committed
394
            raiseAsync(cap, target, msg->exception, false, NULL);
395 396 397
            unlockClosure((StgClosure *)mvar, info);
            return THROWTO_SUCCESS;
        }
398 399 400 401
    }

    case BlockedOnBlackHole:
    {
402
        if (target->flags & TSO_BLOCKEX) {
403 404
            // BlockedOnBlackHole is not interruptible.
            blockedThrowTo(cap,target,msg);
405 406
            return THROWTO_BLOCKED;
        } else {
407 408 409 410 411 412
            // Revoke the message by replacing it with IND. We're not
            // locking anything here, so we might still get a TRY_WAKEUP
            // message from the owner of the blackhole some time in the
            // future, but that doesn't matter.
            ASSERT(target->block_info.bh->header.info == &stg_MSG_BLACKHOLE_info);
            OVERWRITE_INFO(target->block_info.bh, &stg_IND_info);
Ben Gamari's avatar
Ben Gamari committed
413
            raiseAsync(cap, target, msg->exception, false, NULL);
414 415
            return THROWTO_SUCCESS;
        }
416 417 418
    }

    case BlockedOnSTM:
419 420
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
421
            blockedThrowTo(cap,target,msg);
422 423
            return THROWTO_BLOCKED;
        } else {
Ben Gamari's avatar
Ben Gamari committed
424
            raiseAsync(cap, target, msg->exception, false, NULL);
425 426
            return THROWTO_SUCCESS;
        }
427

428
    case BlockedOnCCall_Interruptible:
Ben Gamari's avatar
Ben Gamari committed
429
#if defined(THREADED_RTS)
430 431 432 433 434 435 436 437 438 439 440
    {
        Task *task = NULL;
        // walk suspended_ccalls to find the correct worker thread
        InCall *incall;
        for (incall = cap->suspended_ccalls; incall != NULL; incall = incall->next) {
            if (incall->suspended_tso == target) {
                task = incall->task;
                break;
            }
        }
        if (task != NULL) {
441
            blockedThrowTo(cap, target, msg);
Simon Marlow's avatar
Simon Marlow committed
442 443
            if (!((target->flags & TSO_BLOCKEX) &&
                  ((target->flags & TSO_INTERRUPTIBLE) == 0))) {
444 445 446
                interruptWorkerTask(task);
            }
            return THROWTO_BLOCKED;
447 448 449 450 451
        } else {
            debugTraceCap(DEBUG_sched, cap, "throwTo: could not find worker thread to kill");
        }
        // fall to next
    }
Ben Gamari's avatar
Ben Gamari committed
452
    FALLTHROUGH;
453
#endif
454
    case BlockedOnCCall:
455 456
        blockedThrowTo(cap,target,msg);
        return THROWTO_BLOCKED;
457

Ben Gamari's avatar
Ben Gamari committed
458
#if !defined(THREADEDED_RTS)
459 460 461
    case BlockedOnRead:
    case BlockedOnWrite:
    case BlockedOnDelay:
462 463 464
#if defined(mingw32_HOST_OS)
    case BlockedOnDoProc:
#endif
465 466 467 468 469 470
        if ((target->flags & TSO_BLOCKEX) &&
            ((target->flags & TSO_INTERRUPTIBLE) == 0)) {
            blockedThrowTo(cap,target,msg);
            return THROWTO_BLOCKED;
        } else {
            removeFromQueues(cap,target);
Ben Gamari's avatar
Ben Gamari committed
471
            raiseAsync(cap, target, msg->exception, false, NULL);
472 473
            return THROWTO_SUCCESS;
        }
474 475
#endif

476
    case ThreadMigrating:
Gabor Greif's avatar
Gabor Greif committed
477
        // if it is ThreadMigrating and tso->cap is ours, then it
478 479 480 481 482 483 484 485 486 487
        // *must* be migrating *to* this capability.  If it were
        // migrating away from the capability, then tso->cap would
        // point to the destination.
        //
        // There is a MSG_WAKEUP in the message queue for this thread,
        // but we can just do it preemptively:
        tryWakeupThread(cap, target);
        // and now retry, the thread should be runnable.
        goto retry;

488
    default:
489
        barf("throwTo: unrecognised why_blocked (%d)", target->why_blocked);
490 491 492 493 494
    }
    barf("throwTo");
}

static void
495
throwToSendMsg (Capability *cap STG_UNUSED,
496
                Capability *target_cap USED_IF_THREADS,
497
                MessageThrowTo *msg USED_IF_THREADS)
498

499
{
Ben Gamari's avatar
Ben Gamari committed
500
#if defined(THREADED_RTS)
501
    debugTraceCap(DEBUG_sched, cap, "throwTo: sending a throwto message to cap %lu", (unsigned long)target_cap->no);
502

503
    sendMessage(cap, target_cap, (Message*)msg);
504 505
#endif
}
506

507 508 509
// Block a throwTo message on the target TSO's blocked_exceptions
// queue.  The current Capability must own the target TSO in order to
// modify the blocked_exceptions queue.
510
void
511
blockedThrowTo (Capability *cap, StgTSO *target, MessageThrowTo *msg)
512
{
513 514 515 516 517
    debugTraceCap(DEBUG_sched, cap, "throwTo: blocking on thread %lu",
                  (unsigned long)target->id);

    ASSERT(target->cap == cap);

518
    dirty_TSO(cap,target); // we will modify the blocked_exceptions queue
519
    msg->link = target->blocked_exceptions;
520
    target->blocked_exceptions = msg;
521 522 523 524 525 526 527 528 529 530 531 532 533 534 535
}

/* -----------------------------------------------------------------------------
   Waking up threads blocked in throwTo

   There are two ways to do this: maybePerformBlockedException() will
   perform the throwTo() for the thread at the head of the queue
   immediately, and leave the other threads on the queue.
   maybePerformBlockedException() also checks the TSO_BLOCKEX flag
   before raising an exception.

   awakenBlockedExceptionQueue() will wake up all the threads in the
   queue, but not perform any throwTo() immediately.  This might be
   more appropriate when the target thread is the one actually running
   (see Exception.cmm).
536 537

   Returns: non-zero if an exception was raised, zero otherwise.
538 539
   -------------------------------------------------------------------------- */

540
int
541 542
maybePerformBlockedException (Capability *cap, StgTSO *tso)
{
543 544
    MessageThrowTo *msg;
    const StgInfoTable *i;
545 546
    StgTSO *source;

547
    if (tso->what_next == ThreadComplete || tso->what_next == ThreadFinished) {
548
        if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE) {
549 550 551 552 553 554 555
            awakenBlockedExceptionQueue(cap,tso);
            return 1;
        } else {
            return 0;
        }
    }

556
    if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE &&
Simon Marlow's avatar
Simon Marlow committed
557
        (tso->flags & TSO_BLOCKEX) != 0) {
558
        debugTraceCap(DEBUG_sched, cap, "throwTo: thread %lu has blocked exceptions but is inside block", (unsigned long)tso->id);
Simon Marlow's avatar
Simon Marlow committed
559 560
    }

561
    if (tso->blocked_exceptions != END_BLOCKED_EXCEPTIONS_QUEUE
562 563
        && ((tso->flags & TSO_BLOCKEX) == 0
            || ((tso->flags & TSO_INTERRUPTIBLE) && interruptible(tso)))) {
564

565 566
        // We unblock just the first thread on the queue, and perform
        // its throw immediately.
567 568 569 570 571
    loop:
        msg = tso->blocked_exceptions;
        if (msg == END_BLOCKED_EXCEPTIONS_QUEUE) return 0;
        i = lockClosure((StgClosure*)msg);
        tso->blocked_exceptions = (MessageThrowTo*)msg->link;
572
        if (i == &stg_MSG_NULL_info) {
573 574 575 576
            unlockClosure((StgClosure*)msg,i);
            goto loop;
        }

577
        throwToSingleThreaded(cap, msg->target, msg->exception);
578 579 580
        source = msg->source;
        doneWithMsgThrowTo(msg);
        tryWakeupThread(cap, source);
581
        return 1;
582
    }
583
    return 0;
584 585
}

Simon Marlow's avatar
Simon Marlow committed
586
// awakenBlockedExceptionQueue(): Just wake up the whole queue of
587
// blocked exceptions.
Simon Marlow's avatar
Simon Marlow committed
588

589 590 591
void
awakenBlockedExceptionQueue (Capability *cap, StgTSO *tso)
{
592 593
    MessageThrowTo *msg;
    const StgInfoTable *i;
594
    StgTSO *source;
595 596 597 598

    for (msg = tso->blocked_exceptions; msg != END_BLOCKED_EXCEPTIONS_QUEUE;
         msg = (MessageThrowTo*)msg->link) {
        i = lockClosure((StgClosure *)msg);
599
        if (i != &stg_MSG_NULL_info) {
600 601 602
            source = msg->source;
            doneWithMsgThrowTo(msg);
            tryWakeupThread(cap, source);
603 604
        } else {
            unlockClosure((StgClosure *)msg,i);
605 606 607
        }
    }
    tso->blocked_exceptions = END_BLOCKED_EXCEPTIONS_QUEUE;
608
}
609 610 611 612 613 614

/* -----------------------------------------------------------------------------
   Remove a thread from blocking queues.

   This is for use when we raise an exception in another thread, which
   may be blocked.
615

616 617
   Precondition: we have exclusive access to the TSO, via the same set
   of conditions as throwToSingleThreaded() (c.f.).
618 619
   -------------------------------------------------------------------------- */

620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641
static void
removeFromMVarBlockedQueue (StgTSO *tso)
{
    StgMVar *mvar = (StgMVar*)tso->block_info.closure;
    StgMVarTSOQueue *q = (StgMVarTSOQueue*)tso->_link;

    if (q == (StgMVarTSOQueue*)END_TSO_QUEUE) {
        // already removed from this MVar
        return;
    }

    // Assume the MVar is locked. (not assertable; sometimes it isn't
    // actually WHITEHOLE'd).

    // We want to remove the MVAR_TSO_QUEUE object from the queue.  It
    // isn't doubly-linked so we can't actually remove it; instead we
    // just overwrite it with an IND if possible and let the GC short
    // it out.  However, we have to be careful to maintain the deque
    // structure:

    if (mvar->head == q) {
        mvar->head = q->link;
Simon Marlow's avatar
Simon Marlow committed
642
        OVERWRITE_INFO(q, &stg_IND_info);
643 644 645 646 647 648 649 650 651
        if (mvar->tail == q) {
            mvar->tail = (StgMVarTSOQueue*)END_TSO_QUEUE;
        }
    }
    else if (mvar->tail == q) {
        // we can't replace it with an IND in this case, because then
        // we lose the tail pointer when the GC shorts out the IND.
        // So we use MSG_NULL as a kind of non-dupable indirection;
        // these are ignored by takeMVar/putMVar.
Simon Marlow's avatar
Simon Marlow committed
652
        OVERWRITE_INFO(q, &stg_MSG_NULL_info);
653 654
    }
    else {
Simon Marlow's avatar
Simon Marlow committed
655
        OVERWRITE_INFO(q, &stg_IND_info);
656 657 658 659 660 661
    }

    // revoke the MVar operation
    tso->_link = END_TSO_QUEUE;
}

662 663 664 665 666 667
static void
removeFromQueues(Capability *cap, StgTSO *tso)
{
  switch (tso->why_blocked) {

  case NotBlocked:
668
  case ThreadMigrating:
669 670 671 672 673 674 675 676 677 678 679 680
      return;

  case BlockedOnSTM:
    // Be careful: nothing to do here!  We tell the scheduler that the
    // thread is runnable and we leave it to the stack-walking code to
    // abort the transaction while unwinding the stack.  We should
    // perhaps have a debugging test to make sure that this really
    // happens and that the 'zombie' transaction does not get
    // committed.
    goto done;

  case BlockedOnMVar:
681
  case BlockedOnMVarRead:
682
      removeFromMVarBlockedQueue(tso);
683 684 685
      goto done;

  case BlockedOnBlackHole:
686
      // nothing to do
687 688
      goto done;

689 690 691 692 693 694 695 696 697
  case BlockedOnMsgThrowTo:
  {
      MessageThrowTo *m = tso->block_info.throwto;
      // The message is locked by us, unless we got here via
      // deleteAllThreads(), in which case we own all the
      // capabilities.
      // ASSERT(m->header.info == &stg_WHITEHOLE_info);

      // unlock and revoke it at the same time
698
      doneWithMsgThrowTo(m);
699 700
      break;
  }
701 702 703 704 705 706 707

#if !defined(THREADED_RTS)
  case BlockedOnRead:
  case BlockedOnWrite:
#if defined(mingw32_HOST_OS)
  case BlockedOnDoProc:
#endif
708
      removeThreadFromDeQueue(cap, &blocked_queue_hd, &blocked_queue_tl, tso);
709 710 711 712 713 714 715 716 717
#if defined(mingw32_HOST_OS)
      /* (Cooperatively) signal that the worker thread should abort
       * the request.
       */
      abandonWorkRequest(tso->block_info.async_result->reqID);
#endif
      goto done;

  case BlockedOnDelay:
718
        removeThreadFromQueue(cap, &sleeping_queue, tso);
719
        goto done;
720 721 722
#endif

  default:
723
      barf("removeFromQueues: %d", tso->why_blocked);
724 725 726
  }

 done:
727 728
  tso->why_blocked = NotBlocked;
  appendToRunQueue(cap, tso);
729 730 731 732 733 734 735 736 737
}

/* -----------------------------------------------------------------------------
 * raiseAsync()
 *
 * The following function implements the magic for raising an
 * asynchronous exception in an existing thread.
 *
 * We first remove the thread from any queue on which it might be
738 739
 * blocked.  The possible blockages are MVARs, BLOCKING_QUEUESs, and
 * TSO blocked_exception queues.
740 741
 *
 * We strip the stack down to the innermost CATCH_FRAME, building
742
 * thunks in the heap for all the active computations, so they can
743 744 745
 * be restarted if necessary.  When we reach a CATCH_FRAME, we build
 * an application of the handler to the exception, and push it on
 * the top of the stack.
746
 *
747 748 749 750 751 752 753 754 755 756
 * How exactly do we save all the active computations?  We create an
 * AP_STACK for every UpdateFrame on the stack.  Entering one of these
 * AP_STACKs pushes everything from the corresponding update frame
 * upwards onto the stack.  (Actually, it pushes everything up to the
 * next update frame plus a pointer to the next AP_STACK object.
 * Entering the next AP_STACK object pushes more onto the stack until we
 * reach the last AP_STACK object - at which point the stack should look
 * exactly as it did when we killed the TSO and we can continue
 * execution by entering the closure on top of the stack.
 *
757
 * We can also kill a thread entirely - this happens if either (a) the
758 759 760 761 762 763 764 765 766 767 768
 * exception passed to raiseAsync is NULL, or (b) there's no
 * CATCH_FRAME on the stack.  In either case, we strip the entire
 * stack and replace the thread with a zombie.
 *
 * ToDo: in THREADED_RTS mode, this function is only safe if either
 * (a) we hold all the Capabilities (eg. in GC, or if there is only
 * one Capability), or (b) we own the Capability that the TSO is
 * currently blocked on or on the run queue of.
 *
 * -------------------------------------------------------------------------- */

769
StgTSO *
770
raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
Ben Gamari's avatar
Ben Gamari committed
771
           bool stop_at_atomically, StgUpdateFrame *stop_here)
772
{
773
    const StgRetInfoTable *info;
774
    StgPtr sp, frame;
775
    StgClosure *updatee;
776
    uint32_t i;
777
    StgStack *stack;
778

779 780
    debugTraceCap(DEBUG_sched, cap,
                  "raising exception in thread %ld.", (long)tso->id);
781

782
#if defined(PROFILING)
783
    /*
784
     * Debugging tool: on raising an  exception, show where we are.
785
     * See also Exception.cmm:stg_raisezh.
786
     * This wasn't done for asynchronous exceptions originally; see #1450
787
     */
788
    if (RtsFlags.ProfFlags.showCCSOnException && exception != NULL)
789
    {
790
        fprintCCS_stderr(tso->prof.cccs,exception,tso);
791 792
    }
#endif
793 794
    // ASSUMES: the thread is not already complete or dead
    // Upper layers should deal with that.
795
    ASSERT(tso->what_next != ThreadComplete &&
796
           tso->what_next != ThreadKilled);
797

798
    // only if we own this TSO (except that deleteThread() calls this
799 800
    ASSERT(tso->cap == cap);

801
    stack = tso->stackobj;
802

803
    // mark it dirty; we're about to change its stack.
804
    dirty_TSO(cap, tso);
805
    dirty_STACK(cap, stack);
806

807
    sp = stack->sp;
808

809 810 811 812 813 814
    if (stop_here != NULL) {
        updatee = stop_here->updatee;
    } else {
        updatee = NULL;
    }

815 816 817 818
    // The stack freezing code assumes there's a closure pointer on
    // the top of the stack, so we have to arrange that this is the case...
    //
    if (sp[0] == (W_)&stg_enter_info) {
819
        sp++;
820
    } else {
821 822
        sp--;
        sp[0] = (W_)&stg_dummy_ret_closure;
823 824 825
    }

    frame = sp + 1;
826
    while (stop_here == NULL || frame < (StgPtr)stop_here) {
827

828 829 830 831 832 833 834 835 836 837 838 839 840 841 842
        // 1. Let the top of the stack be the "current closure"
        //
        // 2. Walk up the stack until we find either an UPDATE_FRAME or a
        // CATCH_FRAME.
        //
        // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
        // current closure applied to the chunk of stack up to (but not
        // including) the update frame.  This closure becomes the "current
        // closure".  Go back to step 2.
        //
        // 4. If it's a CATCH_FRAME, then leave the exception handler on
        // top of the stack applied to the exception.
        //
        // 5. If it's a STOP_FRAME, then kill the thread.
        //
843 844 845 846
        // 6. If it's an UNDERFLOW_FRAME, then continue with the next
        //    stack chunk.
        //
        // NB: if we pass an ATOMICALLY_FRAME then abort the associated
847
        // transaction
848

849
        info = get_ret_itbl((StgClosure *)frame);
850

851 852 853 854 855
        switch (info->i.type) {

        case UPDATE_FRAME:
        {
            StgAP_STACK * ap;
856
            uint32_t words;
857 858 859 860 861 862 863 864 865 866

            // First build an AP_STACK consisting of the stack chunk above the
            // current update frame, with the top word on the stack as the
            // fun field.
            //
            words = frame - sp - 1;
            ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));

            ap->size = words;
            ap->fun  = (StgClosure *)sp[0];
Ben Gamari's avatar
Ben Gamari committed
867

868
            sp++;
869
            for(i=0; i < words; ++i) {
870 871 872
                ap->payload[i] = (StgClosure *)*sp++;
            }

873
            write_barrier(); // XXX: Necessary?
874 875 876 877 878 879 880 881 882 883
            SET_HDR(ap,&stg_AP_STACK_info,
                    ((StgClosure *)frame)->header.prof.ccs /* ToDo */);
            TICK_ALLOC_UP_THK(WDS(words+1),0);

            //IF_DEBUG(scheduler,
            //       debugBelch("sched: Updating ");
            //       printPtr((P_)((StgUpdateFrame *)frame)->updatee);
            //       debugBelch(" with ");
            //       printObj((StgClosure *)ap);
            //  );
884

885 886 887 888 889 890 891 892 893 894 895
            if (((StgUpdateFrame *)frame)->updatee == updatee) {
                // If this update frame points to the same closure as
                // the update frame further down the stack
                // (stop_here), then don't perform the update.  We
                // want to keep the blackhole in this case, so we can
                // detect and report the loop (#2783).
                ap = (StgAP_STACK*)updatee;
            } else {
                // Perform the update
                // TODO: this may waste some work, if the thunk has
                // already been updated by another thread.
896
                updateThunk(cap, tso,
897
                            ((StgUpdateFrame *)frame)->updatee, (StgClosure *)ap);
898
            }
899

900 901 902 903 904
            sp += sizeofW(StgUpdateFrame) - 1;
            sp[0] = (W_)ap; // push onto stack
            frame = sp + 1;
            continue; //no need to bump frame
        }
905

906 907
        case UNDERFLOW_FRAME:
        {
908
            StgAP_STACK * ap;
909
            uint32_t words;
910 911 912 913 914 915 916 917 918 919 920

            // First build an AP_STACK consisting of the stack chunk above the
            // current update frame, with the top word on the stack as the
            // fun field.
            //
            words = frame - sp - 1;
            ap = (StgAP_STACK *)allocate(cap,AP_STACK_sizeW(words));

            ap->size = words;
            ap->fun  = (StgClosure *)sp[0];
            sp++;
921
            for(i=0; i < words; ++i) {
922 923 924
                ap->payload[i] = (StgClosure *)*sp++;
            }

925
            SET_HDR(ap,&stg_AP_STACK_NOUPD_info,stack->header.prof.ccs);
nfrisby's avatar
nfrisby committed
926
            TICK_ALLOC_SE_THK(WDS(words+1),0);
927 928 929 930 931 932 933 934 935 936 937 938 939

            stack->sp = sp;
            threadStackUnderflow(cap,tso);
            stack = tso->stackobj;
            sp = stack->sp;

            sp--;
            sp[0] = (W_)ap;
            frame = sp + 1;
            continue;
        }

        case STOP_FRAME:
940 941 942
        {
            // We've stripped the entire stack, the thread is now dead.
            tso->what_next = ThreadKilled;
943 944
            stack->sp = frame + sizeofW(StgStopFrame);
            goto done;
945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973
        }

        case CATCH_FRAME:
            // If we find a CATCH_FRAME, and we've got an exception to raise,
            // then build the THUNK raise(exception), and leave it on
            // top of the CATCH_FRAME ready to enter.
            //
        {
            StgCatchFrame *cf = (StgCatchFrame *)frame;
            StgThunk *raise;

            if (exception == NULL) break;

            // we've got an exception to raise, so let's pass it to the
            // handler in this frame.
            //
            raise = (StgThunk *)allocate(cap,sizeofW(StgThunk)+1);
            TICK_ALLOC_SE_THK(WDS(1),0);
            SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
            raise->payload[0] = exception;

            // throw away the stack from Sp up to the CATCH_FRAME.
            //
            sp = frame - 1;

            /* Ensure that async exceptions are blocked now, so we don't get
             * a surprise exception before we get around to executing the
             * handler.
             */
974 975 976 977 978 979
            tso->flags |= TSO_BLOCKEX;
            if ((cf->exceptions_blocked & TSO_INTERRUPTIBLE) == 0) {
                tso->flags &= ~TSO_INTERRUPTIBLE;
            } else {
                tso->flags |= TSO_INTERRUPTIBLE;
            }
980

981 982 983 984 985
            /* Put the newly-built THUNK on top of the stack, ready to execute
             * when the thread restarts.
             */
            sp[0] = (W_)raise;
            sp[-1] = (W_)&stg_enter_info;
986
            stack->sp = sp-1;
987
            tso->what_next = ThreadRunGHC;
988
            goto done;
989 990 991 992 993 994
        }

        case ATOMICALLY_FRAME:
            if (stop_at_atomically) {
                ASSERT(tso->trec->enclosing_trec == NO_TREC);
                stmCondemnTransaction(cap, tso -> trec);
995
                stack->sp = frame - 2;
996 997 998 999 1000 1001 1002 1003
                // The ATOMICALLY_FRAME expects to be returned a
                // result from the transaction, which it stores in the
                // stack frame.  Hence we arrange to return a dummy
                // result, so that the GC doesn't get upset (#3578).
                // Perhaps a better way would be to have a different
                // ATOMICALLY_FRAME instance for condemned
                // transactions, but I don't fully understand the
                // interaction with STM invariants.
1004
                stack->sp[1] = (W_)&stg_NO_TREC_closure;
1005
                stack->sp[0] = (W_)&stg_ret_p_info;
1006 1007
                tso->what_next = ThreadRunGHC;
                goto done;
1008
            }
1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055
            else
            {
                // Freezing an STM transaction.  Just aborting the
                // transaction would be wrong; this is what we used to
                // do, and it goes wrong if the ATOMICALLY_FRAME ever
                // gets back onto the stack again, which it will do if
                // the transaction is inside unsafePerformIO or
                // unsafeInterleaveIO and hence inside an UPDATE_FRAME.
                //
                // So we want to make it so that if the enclosing
                // computation is resumed, we will re-execute the
                // transaction.  We therefore:
                //
                //   1. abort the current transaction
                //   3. replace the stack up to and including the
                //      atomically frame with a closure representing
                //      a call to "atomically x", where x is the code
                //      of the transaction.
                //   4. continue stripping the stack
                //
                StgTRecHeader *trec = tso->trec;
                StgTRecHeader *outer = trec->enclosing_trec;

                StgThunk *atomically;
                StgAtomicallyFrame *af = (StgAtomicallyFrame*)frame;

                debugTraceCap(DEBUG_stm, cap,
                              "raiseAsync: freezing atomically frame")
                stmAbortTransaction(cap, trec);
                stmFreeAbortedTRec(cap, trec);
                tso->trec = outer;

                atomically = (StgThunk*)allocate(cap,sizeofW(StgThunk)+1);
                TICK_ALLOC_SE_THK(1,0);
                SET_HDR(atomically,&stg_atomically_info,af->header.prof.ccs);
                atomically->payload[0] = af->code;

                // discard stack up to and including the ATOMICALLY_FRAME
                frame += sizeofW(StgAtomicallyFrame);
                sp = frame - 1;

                // replace the ATOMICALLY_FRAME with call to atomically#
                sp[0] = (W_)atomically;
                continue;
            }

        case CATCH_STM_FRAME:
1056
        case CATCH_RETRY_FRAME:
1057 1058 1059 1060 1061 1062
            // CATCH frames within an atomically block: abort the
            // inner transaction and continue.  Eventually we will
            // hit the outer transaction that will get frozen (see
            // above).
            //
            // In this case (unlike ordinary exceptions) we do not care
1063 1064 1065
            // whether the transaction is valid or not because its
            // possible validity cannot have caused the exception
            // and will not be visible after the abort.
1066
        {
1067
            StgTRecHeader *trec = tso -> trec;
1068
            StgTRecHeader *outer = trec -> enclosing_trec;
1069
            debugTraceCap(DEBUG_stm, cap,
1070
                          "found atomically block delivering async exception");
1071
            stmAbortTransaction(cap, trec);
1072
            stmFreeAbortedTRec(cap, trec);
1073
            tso -> trec = outer;
1074
            break;
1075 1076
        };

1077 1078 1079
        default:
            break;
        }
1080

1081 1082
        // move on to the next stack frame
        frame += stack_frame_sizeW((StgClosure *)frame);
1083 1084
    }

1085 1086 1087 1088 1089 1090 1091
done:
    IF_DEBUG(sanity, checkTSO(tso));

    // wake it up
    if (tso->why_blocked != NotBlocked) {
        tso->why_blocked = NotBlocked;
        appendToRunQueue(cap,tso);
1092
    }
1093 1094

    return tso;
1095
}