Sparks.c 10.1 KB
Newer Older
1
/* ---------------------------------------------------------------------------
2
 *
3
 * (c) The GHC Team, 2000-2008
4
 *
5
 * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
6
 *
7
 -------------------------------------------------------------------------*/
8

9
#include "PosixSource.h"
10
#include "Rts.h"
Simon Marlow's avatar
Simon Marlow committed
11

12 13
#include "Schedule.h"
#include "RtsUtils.h"
Simon Marlow's avatar
Simon Marlow committed
14
#include "Trace.h"
15
#include "Prelude.h"
16 17
#include "Sparks.h"

Simon Marlow's avatar
Simon Marlow committed
18
#if defined(THREADED_RTS)
19

20 21
SparkPool *
allocSparkPool( void )
22
{
23
    return newWSDeque(RtsFlags.ParFlags.maxLocalSparks);
24 25
}

Ian Lynagh's avatar
Ian Lynagh committed
26
void
27 28
freeSparkPool (SparkPool *pool)
{
29
    freeWSDeque(pool);
30
}
31

32 33 34 35 36 37 38
/* -----------------------------------------------------------------------------
 * 
 * Turn a spark into a real thread
 *
 * -------------------------------------------------------------------------- */

void
39
createSparkThread (Capability *cap)
40 41 42
{
    StgTSO *tso;

43
    tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize, 
44
                          (StgClosure *)runSparks_closure);
45

46
    traceEventCreateSparkThread(cap, tso->id);
47

48 49 50
    appendToRunQueue(cap,tso);
}

51 52 53 54 55
/* --------------------------------------------------------------------------
 * newSpark: create a new spark, as a result of calling "par"
 * Called directly from STG.
 * -------------------------------------------------------------------------- */

56 57 58
StgInt
newSpark (StgRegTable *reg, StgClosure *p)
{
59 60
    Capability *cap = regTableToCapability(reg);
    SparkPool *pool = cap->sparks;
61

62
    if (!fizzledSpark(p)) {
63
        if (pushWSDeque(pool,p)) {
64 65
            cap->spark_stats.created++;
            traceEventSparkCreate(cap);
66 67 68
        } else {
            /* overflowing the spark pool */
            cap->spark_stats.overflowed++;
69
            traceEventSparkOverflow(cap);
70
	}
71
    } else {
72
        cap->spark_stats.dud++;
73
        traceEventSparkDud(cap);
74
    }
75

76 77 78
    return 1;
}

79 80 81 82 83
/* --------------------------------------------------------------------------
 * Remove all sparks from the spark queues which should not spark any
 * more.  Called after GC. We assume exclusive access to the structure
 * and replace  all sparks in the queue, see explanation below. At exit,
 * the spark pool only contains sparkable closures.
84 85
 * -------------------------------------------------------------------------- */

86
void
87
pruneSparkQueue (Capability *cap)
88
{ 
89
    SparkPool *pool;
90
    StgClosurePtr spark, tmp, *elements;
91
    nat n, pruned_sparks; // stats only
92
    StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
93
    const StgInfoTable *info;
94 95 96 97
    
    n = 0;
    pruned_sparks = 0;
    
98
    pool = cap->sparks;
99
    
100 101 102 103 104 105
    // it is possible that top > bottom, indicating an empty pool.  We
    // fix that here; this is only necessary because the loop below
    // assumes it.
    if (pool->top > pool->bottom)
        pool->top = pool->bottom;

106 107 108 109 110 111 112
    // Take this opportunity to reset top/bottom modulo the size of
    // the array, to avoid overflow.  This is only possible because no
    // stealing is happening during GC.
    pool->bottom  -= pool->top & ~pool->moduloSize;
    pool->top     &= pool->moduloSize;
    pool->topBound = pool->top;

113
    debugTrace(DEBUG_sparks,
114
               "markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)",
115 116
               sparkPoolSize(pool), pool->bottom, pool->top);

117 118 119
    ASSERT_WSDEQUE_INVARIANTS(pool);

    elements = (StgClosurePtr *)pool->elements;
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167

    /* We have exclusive access to the structure here, so we can reset
       bottom and top counters, and prune invalid sparks. Contents are
       copied in-place if they are valuable, otherwise discarded. The
       routine uses "real" indices t and b, starts by computing them
       as the modulus size of top and bottom,

       Copying:

       At the beginning, the pool structure can look like this:
       ( bottom % size >= top % size , no wrap-around)
                  t          b
       ___________***********_________________

       or like this ( bottom % size < top % size, wrap-around )
                  b         t
       ***********__________******************
       As we need to remove useless sparks anyway, we make one pass
       between t and b, moving valuable content to b and subsequent
       cells (wrapping around when the size is reached).

                     b      t
       ***********OOO_______XX_X__X?**********
                     ^____move?____/

       After this movement, botInd becomes the new bottom, and old
       bottom becomes the new top index, both as indices in the array
       size range.
    */
    // starting here
    currInd = (pool->top) & (pool->moduloSize); // mod

    // copies of evacuated closures go to space from botInd on
    // we keep oldBotInd to know when to stop
    oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod

    // on entry to loop, we are within the bounds
    ASSERT( currInd < pool->size && botInd  < pool->size );

    while (currInd != oldBotInd ) {
      /* must use != here, wrap-around at size
	 subtle: loop not entered if queue empty
       */

      /* check element at currInd. if valuable, evacuate and move to
	 botInd, otherwise move on */
      spark = elements[currInd];

168 169 170
      // We have to be careful here: in the parallel GC, another
      // thread might evacuate this closure while we're looking at it,
      // so grab the info pointer just once.
171 172 173 174 175 176 177 178
      if (GET_CLOSURE_TAG(spark) != 0) {
          // Tagged pointer is a value, so the spark has fizzled.  It
          // probably never happens that we get a tagged pointer in
          // the spark pool, because we would have pruned the spark
          // during the previous GC cycle if it turned out to be
          // evaluated, but it doesn't hurt to have this check for
          // robustness.
          pruned_sparks++;
179
          cap->spark_stats.fizzled++;
180
          traceEventSparkFizzle(cap);
181 182 183 184 185 186 187 188 189 190 191
      } else {
          info = spark->header.info;
          if (IS_FORWARDING_PTR(info)) {
              tmp = (StgClosure*)UN_FORWARDING_PTR(info);
              /* if valuable work: shift inside the pool */
              if (closure_SHOULD_SPARK(tmp)) {
                  elements[botInd] = tmp; // keep entry (new address)
                  botInd++;
                  n++;
              } else {
                  pruned_sparks++; // discard spark
192
                  cap->spark_stats.fizzled++;
193
                  traceEventSparkFizzle(cap);
194
              }
Simon Marlow's avatar
Simon Marlow committed
195 196 197 198 199 200 201 202
          } else if (HEAP_ALLOCED(spark)) {
              if ((Bdescr((P_)spark)->flags & BF_EVACUATED)) {
                  if (closure_SHOULD_SPARK(spark)) {
                      elements[botInd] = spark; // keep entry (new address)
                      botInd++;
                      n++;
                  } else {
                      pruned_sparks++; // discard spark
203
                      cap->spark_stats.fizzled++;
204
                      traceEventSparkFizzle(cap);
Simon Marlow's avatar
Simon Marlow committed
205
                  }
206 207
              } else {
                  pruned_sparks++; // discard spark
208
                  cap->spark_stats.gcd++;
209
                  traceEventSparkGC(cap);
210
              }
211
          } else {
Simon Marlow's avatar
Simon Marlow committed
212 213 214 215 216 217 218
              if (INFO_PTR_TO_STRUCT(info)->type == THUNK_STATIC) {
                  if (*THUNK_STATIC_LINK(spark) != NULL) {
                      elements[botInd] = spark; // keep entry (new address)
                      botInd++;
                      n++;
                  } else {
                      pruned_sparks++; // discard spark
219
                      cap->spark_stats.gcd++;
220
                      traceEventSparkGC(cap);
Simon Marlow's avatar
Simon Marlow committed
221 222 223
                  }
              } else {
                  pruned_sparks++; // discard spark
224
                  cap->spark_stats.fizzled++;
225
                  traceEventSparkFizzle(cap);
Simon Marlow's avatar
Simon Marlow committed
226
              }
227
          }
228
      }
229

230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
      currInd++;

      // in the loop, we may reach the bounds, and instantly wrap around
      ASSERT( currInd <= pool->size && botInd <= pool->size );
      if ( currInd == pool->size ) { currInd = 0; }
      if ( botInd == pool->size )  { botInd = 0;  }

    } // while-loop over spark pool elements

    ASSERT(currInd == oldBotInd);

    pool->top = oldBotInd; // where we started writing
    pool->topBound = pool->top;

    pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size); 
    // first free place we did not use (corrected by wraparound)

247
    debugTrace(DEBUG_sparks, "pruned %d sparks", pruned_sparks);
248
    
249
    debugTrace(DEBUG_sparks,
250
               "new spark queue len=%ld; (hd=%ld; tl=%ld)",
251 252
               sparkPoolSize(pool), pool->bottom, pool->top);

253
    ASSERT_WSDEQUE_INVARIANTS(pool);
254 255
}

256 257
/* GC for the spark pool, called inside Capability.c for all
   capabilities in turn. Blindly "evac"s complete spark pool. */
258 259 260 261
void
traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
{
    StgClosure **sparkp;
262 263
    SparkPool *pool;
    StgWord top,bottom, modMask;
264
    
265 266
    pool = cap->sparks;

267
    ASSERT_WSDEQUE_INVARIANTS(pool);
268 269 270

    top = pool->top;
    bottom = pool->bottom;
271
    sparkp = (StgClosurePtr*)pool->elements;
272 273 274 275 276 277 278 279 280
    modMask = pool->moduloSize;

    while (top < bottom) {
    /* call evac for all closures in range (wrap-around via modulo)
     * In GHC-6.10, evac takes an additional 1st argument to hold a
     * GC-specific register, see rts/sm/GC.c::mark_root()
     */
      evac( user , sparkp + (top & modMask) ); 
      top++;
281
    }
282

283
    debugTrace(DEBUG_sparks,
284
               "traversed spark queue, len=%ld; (hd=%ld; tl=%ld)",
285 286 287 288 289 290 291 292 293 294
               sparkPoolSize(pool), pool->bottom, pool->top);
}

/* ----------------------------------------------------------------------------
 * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
 * capabilities) and its size. Accesses all spark pools and equally
 * distributes the sparks among them.
 *
 * Could be called after GC, before Cap. release, from scheduler. 
 * -------------------------------------------------------------------------- */
Simon Marlow's avatar
Simon Marlow committed
295 296
void balanceSparkPoolsCaps(nat n_caps, Capability caps[])
   GNUC3_ATTRIBUTE(__noreturn__);
297

298 299
void balanceSparkPoolsCaps(nat n_caps STG_UNUSED, 
                           Capability caps[] STG_UNUSED) {
300
  barf("not implemented");
301 302
}

303 304 305
#else

StgInt
Simon Marlow's avatar
Simon Marlow committed
306
newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
307 308 309 310 311
{
    /* nothing */
    return 1;
}

Simon Marlow's avatar
Simon Marlow committed
312
#endif /* THREADED_RTS */