Messages.c 11.7 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/* ---------------------------------------------------------------------------
 *
 * (c) The GHC Team, 2010
 *
 * Inter-Capability message passing
 *
 * --------------------------------------------------------------------------*/

#include "Rts.h"
#include "Messages.h"
#include "Trace.h"
#include "Capability.h"
#include "Schedule.h"
#include "Threads.h"
#include "RaiseAsync.h"
#include "sm/Storage.h"

/* ----------------------------------------------------------------------------
   Send a message to another Capability
   ------------------------------------------------------------------------- */

Ben Gamari's avatar
Ben Gamari committed
22
#if defined(THREADED_RTS)
23 24 25 26 27

void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
{
    ACQUIRE_LOCK(&to_cap->lock);

Ben Gamari's avatar
Ben Gamari committed
28
#if defined(DEBUG)
29 30
    {
        const StgInfoTable *i = msg->header.info;
31
        if (i != &stg_MSG_THROWTO_info &&
32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
            i != &stg_MSG_BLACKHOLE_info &&
            i != &stg_MSG_TRY_WAKEUP_info &&
            i != &stg_IND_info && // can happen if a MSG_BLACKHOLE is revoked
            i != &stg_WHITEHOLE_info) {
            barf("sendMessage: %p", i);
        }
    }
#endif

    msg->link = to_cap->inbox;
    to_cap->inbox = msg;

    recordClosureMutated(from_cap,(StgClosure*)msg);

    if (to_cap->running_task == NULL) {
47
        to_cap->running_task = myTask();
48
            // precond for releaseCapability_()
Ben Gamari's avatar
Ben Gamari committed
49
        releaseCapability_(to_cap,false);
50
    } else {
51
        interruptCapability(to_cap);
52 53 54 55 56 57 58 59 60 61 62
    }

    RELEASE_LOCK(&to_cap->lock);
}

#endif /* THREADED_RTS */

/* ----------------------------------------------------------------------------
   Handle a message
   ------------------------------------------------------------------------- */

Ben Gamari's avatar
Ben Gamari committed
63
#if defined(THREADED_RTS)
64 65 66 67 68 69 70 71 72

void
executeMessage (Capability *cap, Message *m)
{
    const StgInfoTable *i;

loop:
    write_barrier(); // allow m->header to be modified by another thread
    i = m->header.info;
73
    if (i == &stg_MSG_TRY_WAKEUP_info)
74 75
    {
        StgTSO *tso = ((MessageWakeup *)m)->tso;
76
        debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld",
77
                      (W_)tso->id);
78 79 80 81 82
        tryWakeupThread(cap, tso);
    }
    else if (i == &stg_MSG_THROWTO_info)
    {
        MessageThrowTo *t = (MessageThrowTo *)m;
83
        uint32_t r;
84 85 86 87 88 89 90 91
        const StgInfoTable *i;

        i = lockClosure((StgClosure*)m);
        if (i != &stg_MSG_THROWTO_info) {
            unlockClosure((StgClosure*)m, i);
            goto loop;
        }

92
        debugTraceCap(DEBUG_sched, cap, "message: throwTo %ld -> %ld",
93
                      (W_)t->source->id, (W_)t->target->id);
94 95 96 97 98 99 100

        ASSERT(t->source->why_blocked == BlockedOnMsgThrowTo);
        ASSERT(t->source->block_info.closure == (StgClosure *)m);

        r = throwToMsg(cap, t);

        switch (r) {
101
        case THROWTO_SUCCESS: {
102
            // this message is done
103 104 105
            StgTSO *source = t->source;
            doneWithMsgThrowTo(t);
            tryWakeupThread(cap, source);
106
            break;
107
        }
108 109 110 111 112 113 114 115
        case THROWTO_BLOCKED:
            // unlock the message
            unlockClosure((StgClosure*)m, &stg_MSG_THROWTO_info);
            break;
        }
    }
    else if (i == &stg_MSG_BLACKHOLE_info)
    {
116
        uint32_t r;
117 118 119 120 121 122 123 124
        MessageBlackHole *b = (MessageBlackHole*)m;

        r = messageBlackHole(cap, b);
        if (r == 0) {
            tryWakeupThread(cap, b->tso);
        }
        return;
    }
125
    else if (i == &stg_IND_info || i == &stg_MSG_NULL_info)
126 127 128 129 130 131
    {
        // message was revoked
        return;
    }
    else if (i == &stg_WHITEHOLE_info)
    {
132 133 134
#if defined(PROF_SPIN)
        ++whitehole_executeMessage_spin;
#endif
135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
        goto loop;
    }
    else
    {
        barf("executeMessage: %p", i);
    }
}

#endif

/* ----------------------------------------------------------------------------
   Handle a MSG_BLACKHOLE message

   This is called from two places: either we just entered a BLACKHOLE
   (stg_BLACKHOLE_info), or we received a MSG_BLACKHOLE in our
150
   cap->inbox.
151 152

   We need to establish whether the BLACKHOLE belongs to
153
   this Capability, and
154 155 156 157 158 159 160 161 162 163
     - if so, arrange to block the current thread on it
     - otherwise, forward the message to the right place

   Returns:
     - 0 if the blocked thread can be woken up by the caller
     - 1 if the thread is still blocked, and we promise to send a MSG_TRY_WAKEUP
       at some point in the future.

   ------------------------------------------------------------------------- */

164
uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg)
165 166 167 168
{
    const StgInfoTable *info;
    StgClosure *p;
    StgBlockingQueue *bq;
169
    StgClosure *bh = UNTAG_CLOSURE(msg->bh);
170 171
    StgTSO *owner;

Austin Seipp's avatar
Austin Seipp committed
172 173
    debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on "
                  "blackhole %p", (W_)msg->tso->id, msg->bh);
174 175

    info = bh->header.info;
176
    load_load_barrier();  // See Note [Heap memory barriers] in SMP.h
177 178 179 180 181

    // If we got this message in our inbox, it might be that the
    // BLACKHOLE has already been updated, and GC has shorted out the
    // indirection, so the pointer no longer points to a BLACKHOLE at
    // all.
182 183
    if (info != &stg_BLACKHOLE_info &&
        info != &stg_CAF_BLACKHOLE_info &&
184
        info != &__stg_EAGER_BLACKHOLE_info &&
185 186 187 188 189 190 191 192
        info != &stg_WHITEHOLE_info) {
        // if it is a WHITEHOLE, then a thread is in the process of
        // trying to BLACKHOLE it.  But we know that it was once a
        // BLACKHOLE, so there is at least a valid pointer in the
        // payload, so we can carry on.
        return 0;
    }

193 194
    // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
    // or a value.
195
loop:
196 197 198
    // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load
    // and turns this into an infinite loop.
    p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee));
199
    info = p->header.info;
200
    load_load_barrier();  // See Note [Heap memory barriers] in SMP.h
201 202 203 204 205 206 207

    if (info == &stg_IND_info)
    {
        // This could happen, if e.g. we got a BLOCKING_QUEUE that has
        // just been replaced with an IND by another thread in
        // updateThunk().  In which case, if we read the indirectee
        // again we should get the value.
208
        // See Note [BLACKHOLE pointing to IND] in sm/Evac.c
209 210 211 212 213
        goto loop;
    }

    else if (info == &stg_TSO_info)
    {
214
        owner = (StgTSO*)p;
215

Ben Gamari's avatar
Ben Gamari committed
216
#if defined(THREADED_RTS)
217 218
        if (owner->cap != cap) {
            sendMessage(cap, owner->cap, (Message*)msg);
Austin Seipp's avatar
Austin Seipp committed
219 220
            debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d",
                          owner->cap->no);
221 222 223 224 225 226 227 228
            return 1;
        }
#endif
        // owner is the owner of the BLACKHOLE, and resides on this
        // Capability.  msg->tso is the first thread to block on this
        // BLACKHOLE, so we first create a BLOCKING_QUEUE object.

        bq = (StgBlockingQueue*)allocate(cap, sizeofW(StgBlockingQueue));
229

230 231 232 233
        // initialise the BLOCKING_QUEUE object
        bq->bh = bh;
        bq->queue = msg;
        bq->owner = owner;
234

235
        msg->link = (MessageBlackHole*)END_TSO_QUEUE;
236

237 238 239 240 241
        // All BLOCKING_QUEUES are linked in a list on owner->bq, so
        // that we can search through them in the event that there is
        // a collision to update a BLACKHOLE and a BLOCKING_QUEUE
        // becomes orphaned (see updateThunk()).
        bq->link = owner->bq;
242 243 244 245 246
        SET_HDR(bq, &stg_BLOCKING_QUEUE_DIRTY_info, CCS_SYSTEM);
        // We are about to make the newly-constructed message visible to other cores;
        // a barrier is necessary to ensure that all writes are visible.
        // See Note [Heap memory barriers] in SMP.h.
        write_barrier();
247
        dirty_TSO(cap, owner); // we will modify owner->bq
248
        owner->bq = bq;
249 250 251 252 253 254 255 256 257 258 259

        // If the owner of the blackhole is currently runnable, then
        // bump it to the front of the run queue.  This gives the
        // blocked-on thread a little boost which should help unblock
        // this thread, and may avoid a pile-up of other threads
        // becoming blocked on the same BLACKHOLE (#3838).
        //
        // NB. we check to make sure that the owner is not the same as
        // the current thread, since in that case it will not be on
        // the run queue.
        if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
260
            promoteInRunQueue(cap, owner);
261 262
        }

263
        // point to the BLOCKING_QUEUE from the BLACKHOLE
264
        write_barrier(); // make the BQ visible, see Note [Heap memory barriers].
265
        IF_NONMOVING_WRITE_BARRIER_ENABLED {
266 267
            updateRemembSetPushClosure(cap, (StgClosure*)p);
        }
268 269 270
        ((StgInd*)bh)->indirectee = (StgClosure *)bq;
        recordClosureMutated(cap,bh); // bh was mutated

271
        debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
272
                      (W_)msg->tso->id, (W_)owner->id);
273 274 275

        return 1; // blocked
    }
276
    else if (info == &stg_BLOCKING_QUEUE_CLEAN_info ||
277 278 279 280 281 282
             info == &stg_BLOCKING_QUEUE_DIRTY_info)
    {
        StgBlockingQueue *bq = (StgBlockingQueue *)p;

        ASSERT(bq->bh == bh);

283
        owner = bq->owner;
284 285 286

        ASSERT(owner != END_TSO_QUEUE);

Ben Gamari's avatar
Ben Gamari committed
287
#if defined(THREADED_RTS)
288 289
        if (owner->cap != cap) {
            sendMessage(cap, owner->cap, (Message*)msg);
Austin Seipp's avatar
Austin Seipp committed
290 291
            debugTraceCap(DEBUG_sched, cap, "forwarding message to cap %d",
                          owner->cap->no);
292 293 294 295
            return 1;
        }
#endif

296
        IF_NONMOVING_WRITE_BARRIER_ENABLED {
297 298 299 300
            // We are about to overwrite bq->queue; make sure its current value
            // makes it into the update remembered set
            updateRemembSetPushClosure(cap, (StgClosure*)bq->queue);
        }
301 302
        msg->link = bq->queue;
        bq->queue = msg;
303 304
        // No barrier is necessary here: we are only exposing the
        // closure to the GC. See Note [Heap memory barriers] in SMP.h.
305 306 307 308
        recordClosureMutated(cap,(StgClosure*)msg);

        if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
            bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
309 310
            // No barrier is necessary here: we are only exposing the
            // closure to the GC. See Note [Heap memory barriers] in SMP.h.
311
            recordClosureMutated(cap,(StgClosure*)bq);
312 313
        }

314 315 316
        debugTraceCap(DEBUG_sched, cap,
                      "thread %d blocked on existing BLOCKING_QUEUE "
                      "owned by thread %d",
317
                      (W_)msg->tso->id, (W_)owner->id);
318

319 320
        // See above, #3838
        if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
321
            promoteInRunQueue(cap, owner);
322 323
        }

324 325
        return 1; // blocked
    }
326

327 328 329
    return 0; // not blocked
}

330 331 332 333 334 335 336 337 338 339 340 341
// A shorter version of messageBlackHole(), that just returns the
// owner (or NULL if the owner cannot be found, because the blackhole
// has been updated in the meantime).

StgTSO * blackHoleOwner (StgClosure *bh)
{
    const StgInfoTable *info;
    StgClosure *p;

    info = bh->header.info;

    if (info != &stg_BLACKHOLE_info &&
342
        info != &stg_CAF_BLACKHOLE_info &&
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361
        info != &__stg_EAGER_BLACKHOLE_info &&
        info != &stg_WHITEHOLE_info) {
        return NULL;
    }

    // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
    // or a value.
loop:
    // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load
    // and turns this into an infinite loop.
    p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee));
    info = p->header.info;

    if (info == &stg_IND_info) goto loop;

    else if (info == &stg_TSO_info)
    {
        return (StgTSO*)p;
    }
362
    else if (info == &stg_BLOCKING_QUEUE_CLEAN_info ||
363 364 365 366 367
             info == &stg_BLOCKING_QUEUE_DIRTY_info)
    {
        StgBlockingQueue *bq = (StgBlockingQueue *)p;
        return bq->owner;
    }
368

369 370
    return NULL; // not blocked
}