STM.c 33.4 KB
Newer Older
1
2
/* -----------------------------------------------------------------------------
 *
3
 * (c) The GHC Team 1998-2005
4
5
6
 * 
 * STM implementation.
 *
7
8
 * Overview
 * --------
9
 *
10
11
12
13
14
15
 * See the PPoPP 2005 paper "Composable memory transactions".  In summary, 
 * each transcation has a TRec (transaction record) holding entries for each of the
 * TVars (transactional variables) that it has accessed.  Each entry records
 * (a) the TVar, (b) the expected value seen in the TVar, (c) the new value that
 * the transaction wants to write to the TVar, (d) during commit, the identity of
 * the TRec that wrote the expected value.  
16
 *
17
18
19
20
21
22
23
 * Separate TRecs are used for each level in a nest of transactions.  This allows
 * a nested transaction to be aborted without condemning its enclosing transactions.
 * This is needed in the implementation of catchRetry.  Note that the "expected value"
 * in a nested transaction's TRec is the value expected to be *held in memory* if
 * the transaction commits -- not the "new value" stored in one of the enclosing
 * transactions.  This means that validation can be done without searching through
 * a nest of TRecs.
24
 *
25
26
 * Concurrency control
 * -------------------
27
 *
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 * Three different concurrency control schemes can be built according to the settings
 * in STM.h:
 * 
 * STM_UNIPROC assumes that the caller serialises invocations on the STM interface.
 * In the Haskell RTS this means it is suitable only for non-SMP builds.
 *
 * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during
 * an invocation on the STM interface.  Note that this does not mean that 
 * transactions are simply serialized -- the lock is only held *within* the 
 * implementation of stmCommitTransaction, stmWait etc.
 *
 * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis
 * and, when committing a transaction, no locks are acquired for TVars that have
 * been read but not updated.
 *
 * Concurrency control is implemented in the functions:
 *
 *    lock_stm
 *    unlock_stm
 *    lock_tvar / cond_lock_tvar
 *    unlock_tvar
 *
 * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the 
 * implementation of these functions.  
 *
 * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock
 * using STM_CG_LOCK, and otherwise they are no-ops.
55
 *
56
57
58
59
60
 * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they 
 * have other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well
 * as the actual business of maniupultaing a lock (present only in STM_FG_LOCKS
 * builds).  This is because locking a TVar is implemented by writing the lock
 * holder's TRec into the TVar's current_value field:
61
 *
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
 *   lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value 
 *               it contained.
 *
 *   cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it 
 *               contains a specified value.  Return TRUE if this succeeds,
 *               FALSE otherwise.
 *
 *   unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only),
 *               storing a specified value in place of the lock entry.
 *
 * Using these operations, the typcial pattern of a commit/validate/wait operation
 * is to (a) lock the STM, (b) lock all the TVars being updated, (c) check that 
 * the TVars that were only read from still contain their expected values, 
 * (d) release the locks on the TVars, writing updates to them in the case of a 
 * commit, (e) unlock the STM.
 *
 * Queues of waiting threads hang off the first_wait_queue_entry field of each
 * TVar.  This may only be manipulated when holding that TVar's lock.  In
 * particular, when a thread is putting itself to sleep, it mustn't release
 * the TVar's lock until it has added itself to the wait queue and marked its
 * TSO as BlockedOnSTM -- this makes sure that other threads will know to wake it.
83
84
85
86
87
88
89
90
 *
 * ---------------------------------------------------------------------------*/

#include "PosixSource.h"
#include "Rts.h"
#include "RtsFlags.h"
#include "RtsUtils.h"
#include "Schedule.h"
91
#include "SMP.h"
92
93
94
95
96
97
#include "STM.h"
#include "Storage.h"

#include <stdlib.h>
#include <stdio.h>

98
#define TRUE 1
99
#define FALSE 0
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115

// ACQ_ASSERT is used for assertions which are only required for SMP builds with
// fine-grained locking. 

#if defined(STM_FG_LOCKS)
#define ACQ_ASSERT(_X) ASSERT(_X)
#define NACQ_ASSERT(_X) /*Nothing*/
#else
#define ACQ_ASSERT(_X) /*Nothing*/
#define NACQ_ASSERT(_X) ASSERT(_X)
#endif

/*......................................................................*/

// If SHAKE is defined then validation will sometime spuriously fail.  They helps test
// unusualy code paths if genuine contention is rare
116
117
118
119
120
121
122
123
124
125
126
127
128
129

#if defined(DEBUG)
#define SHAKE
#define TRACE(_x...) IF_DEBUG(stm, debugBelch ( _x ))
#else
#define TRACE(_x...) /*Nothing*/
#endif

#ifdef SHAKE
static const int do_shake = TRUE;
#else
static const int do_shake = FALSE;
#endif
static int shake_ctr = 0;
130
static int shake_lim = 1;
131
132
133

static int shake(void) {
  if (do_shake) {
134
135
136
    if (((shake_ctr++) % shake_lim) == 0) {
      shake_ctr = 1;
      shake_lim ++;
137
138
139
140
141
142
143
144
145
146
147
148
149
      return TRUE;
    } 
    return FALSE;
  } else {
    return FALSE;
  }
}

/*......................................................................*/

// Helper macros for iterating over entries within a transaction
// record

150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#define FOR_EACH_ENTRY(_t,_x,CODE) do {                                         \
  StgTRecHeader *__t = (_t);                                                    \
  StgTRecChunk *__c = __t -> current_chunk;                                     \
  StgWord __limit = __c -> next_entry_idx;                                      \
  TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld\n", __t, __c, __limit); \
  while (__c != END_STM_CHUNK_LIST) {                                           \
    StgWord __i;                                                                \
    for (__i = 0; __i < __limit; __i ++) {                                      \
      TRecEntry *_x = &(__c -> entries[__i]);                                   \
      do { CODE } while (0);                                                    \
    }                                                                           \
    __c = __c -> prev_chunk;                                                    \
    __limit = TREC_CHUNK_NUM_ENTRIES;                                           \
  }                                                                             \
 exit_for_each:                                                                 \
  if (FALSE) goto exit_for_each;                                                \
166
167
168
169
170
171
} while (0)

#define BREAK_FOR_EACH goto exit_for_each
     
/*......................................................................*/

172
173
#if defined(STM_UNIPROC)
static const StgBool use_read_phase = FALSE;
174

175
176
177
static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
  TRACE("%p : lock_stm()\n", trec);
}
178

179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
  TRACE("%p : unlock_stm()\n", trec);
}

static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
                             StgTVar *s STG_UNUSED) {
  StgClosure *result;
  TRACE("%p : lock_tvar(%p)\n", trec, s);
  result = s -> current_value;
  return result;
}

static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
                        StgTVar *s STG_UNUSED,
                        StgClosure *c,
                        StgBool force_update) {
  TRACE("%p : unlock_tvar(%p)\n", trec, s);
  if (force_update) {
    s -> current_value = c;
198
  }
199
}
200

201
202
203
204
205
206
207
208
209
static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
                              StgTVar *s STG_UNUSED,
                              StgClosure *expected) {
  StgClosure *result;
  TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
  result = s -> current_value;
  TRACE("%p : %d\n", (result == expected) ? "success" : "failure");
  return (result == expected);
}
210
#endif
211
212
213
214
215
216
217
218
219

#if defined(STM_CG_LOCK) /*........................................*/

static const StgBool use_read_phase = FALSE;
static volatile StgTRecHeader *smp_locked = NULL;

static void lock_stm(StgTRecHeader *trec) {
  while (cas(&smp_locked, NULL, trec) != NULL) { }
  TRACE("%p : lock_stm()\n", trec);
220
221
}

222
223
224
225
226
static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
  TRACE("%p : unlock_stm()\n", trec);
  ASSERT (smp_locked == trec);
  smp_locked = 0;
}
227

228
229
230
231
232
233
234
235
static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
                             StgTVar *s STG_UNUSED) {
  StgClosure *result;
  TRACE("%p : lock_tvar(%p)\n", trec, s);
  ASSERT (smp_locked == trec);
  result = s -> current_value;
  return result;
}
236

237
238
239
240
241
242
243
244
static void *unlock_tvar(StgTRecHeader *trec STG_UNUSED,
                         StgTVar *s STG_UNUSED,
                         StgClosure *c,
                         StgBool force_update) {
  TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
  ASSERT (smp_locked == trec);
  if (force_update) {
    s -> current_value = c;
245
  }
246
247
248
249
250
251
252
253
254
255
256
257
}

static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
                               StgTVar *s STG_UNUSED,
                               StgClosure *expected) {
  StgClosure *result;
  TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
  ASSERT (smp_locked == trec);
  result = s -> current_value;
  TRACE("%p : %d\n", result ? "success" : "failure");
  return (result == expected);
}
258
#endif
259
260
261
262
263
264
265

#if defined(STM_FG_LOCKS) /*...................................*/

static const StgBool use_read_phase = TRUE;

static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
  TRACE("%p : lock_stm()\n", trec);
266
267
}

268
269
270
static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
  TRACE("%p : unlock_stm()\n", trec);
}
271

272
273
274
275
276
277
278
279
280
281
282
static StgClosure *lock_tvar(StgTRecHeader *trec, 
                             StgTVar *s STG_UNUSED) {
  StgClosure *result;
  TRACE("%p : lock_tvar(%p)\n", trec, s);
  do {
    do {
      result = s -> current_value;
    } while (GET_INFO(result) == &stg_TREC_HEADER_info);
  } while (cas(&(s -> current_value), result, trec) != result);
  return result;
}
283

284
285
286
287
288
289
290
static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
                        StgTVar *s,
                        StgClosure *c,
                        StgBool force_update STG_UNUSED) {
  TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
  ASSERT(s -> current_value == trec);
  s -> current_value = c;
291
292
}

293
294
295
296
297
298
299
300
static StgBool cond_lock_tvar(StgTRecHeader *trec, 
                              StgTVar *s,
                              StgClosure *expected) {
  StgClosure *result;
  TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
  result = cas(&(s -> current_value), expected, trec);
  TRACE("%p : %s\n", trec, result ? "success" : "failure");
  return (result == expected);
301
}
302
#endif
303
304
305
306
307
308

/*......................................................................*/

// Helper functions for thread blocking and unblocking

static void park_tso(StgTSO *tso) {
309
  ACQUIRE_LOCK(&sched_mutex);
310
311
312
  ASSERT(tso -> why_blocked == NotBlocked);
  tso -> why_blocked = BlockedOnSTM;
  tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
313
  RELEASE_LOCK(&sched_mutex);
314
315
316
317
318
319
320
321
322
  TRACE("park_tso on tso=%p\n", tso);
}

static void unpark_tso(StgTSO *tso) {
  // We will continue unparking threads while they remain on one of the wait
  // queues: it's up to the thread itself to remove it from the wait queues
  // if it decides to do so when it is scheduled.
  if (tso -> why_blocked == BlockedOnSTM) {
    TRACE("unpark_tso on tso=%p\n", tso);
323
    ACQUIRE_LOCK(&sched_mutex);
324
325
    tso -> why_blocked = NotBlocked;
    PUSH_ON_RUN_QUEUE(tso);
326
    RELEASE_LOCK(&sched_mutex);
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
  } else {
    TRACE("spurious unpark_tso on tso=%p\n", tso);
  }
}

static void unpark_waiters_on(StgTVar *s) {
  StgTVarWaitQueue *q;
  TRACE("unpark_waiters_on tvar=%p\n", s);
  for (q = s -> first_wait_queue_entry; 
       q != END_STM_WAIT_QUEUE; 
       q = q -> next_queue_entry) {
    unpark_tso(q -> waiting_tso);
  }
}

/*......................................................................*/

// Helper functions for allocation and initialization

346
347
static StgTVarWaitQueue *new_stg_tvar_wait_queue(StgRegTable *reg,
                                                 StgTSO *waiting_tso) {
348
  StgTVarWaitQueue *result;
349
350
  result = (StgTVarWaitQueue *)allocateLocal(reg, sizeofW(StgTVarWaitQueue));
  SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM);
351
352
353
354
  result -> waiting_tso = waiting_tso;
  return result;
}

355
static StgTRecChunk *new_stg_trec_chunk(StgRegTable *reg) {
356
  StgTRecChunk *result;
357
358
  result = (StgTRecChunk *)allocateLocal(reg, sizeofW(StgTRecChunk));
  SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
359
360
361
362
363
  result -> prev_chunk = END_STM_CHUNK_LIST;
  result -> next_entry_idx = 0;
  return result;
}

364
365
static StgTRecHeader *new_stg_trec_header(StgRegTable *reg,
                                          StgTRecHeader *enclosing_trec) {
366
  StgTRecHeader *result;
367
368
369
  result = (StgTRecHeader *) allocateLocal(reg, sizeofW(StgTRecHeader));
  SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);

370
  result -> enclosing_trec = enclosing_trec;
371
  result -> current_chunk = new_stg_trec_chunk(reg);
372
373
374
375
376

  if (enclosing_trec == NO_TREC) {
    result -> state = TREC_ACTIVE;
  } else {
    ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
377
           enclosing_trec -> state == TREC_CONDEMNED);
378
379
380
381
382
383
384
385
386
387
    result -> state = enclosing_trec -> state;
  }

  return result;  
}

/*......................................................................*/

// Helper functions for managing waiting lists

388
389
390
static void build_wait_queue_entries_for_trec(StgRegTable *reg,
                                      StgTSO *tso, 
                                      StgTRecHeader *trec) {
391
392
  ASSERT(trec != NO_TREC);
  ASSERT(trec -> enclosing_trec == NO_TREC);
393
394
395
396
  ASSERT(trec -> state == TREC_ACTIVE);

  TRACE("%p : build_wait_queue_entries_for_trec()\n", trec);

397
398
399
400
401
  FOR_EACH_ENTRY(trec, e, {
    StgTVar *s;
    StgTVarWaitQueue *q;
    StgTVarWaitQueue *fq;
    s = e -> tvar;
402
403
404
    TRACE("%p : adding tso=%p to wait queue for tvar=%p\n", trec, tso, s);
    ACQ_ASSERT(s -> current_value == trec);
    NACQ_ASSERT(s -> current_value == e -> expected_value);
405
    fq = s -> first_wait_queue_entry;
406
    q = new_stg_tvar_wait_queue(reg, tso);
407
408
409
410
411
412
413
414
415
416
    q -> next_queue_entry = fq;
    q -> prev_queue_entry = END_STM_WAIT_QUEUE;
    if (fq != END_STM_WAIT_QUEUE) {
      fq -> prev_queue_entry = q;
    }
    s -> first_wait_queue_entry = q;
    e -> new_value = (StgClosure *) q;
  });
}

417
static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) {
418
419
  ASSERT(trec != NO_TREC);
  ASSERT(trec -> enclosing_trec == NO_TREC);
420
  ASSERT(trec -> state == TREC_WAITING ||
421
422
423
424
         trec -> state == TREC_CONDEMNED);

  TRACE("%p : remove_wait_queue_entries_for_trec()\n", trec);

425
426
427
428
429
430
  FOR_EACH_ENTRY(trec, e, {
    StgTVar *s;
    StgTVarWaitQueue *pq;
    StgTVarWaitQueue *nq;
    StgTVarWaitQueue *q;
    s = e -> tvar;
431
    StgClosure *saw = lock_tvar(trec, s);
432
    q = (StgTVarWaitQueue *) (e -> new_value);
433
434
    TRACE("%p : removing tso=%p from wait queue for tvar=%p\n", trec, q -> waiting_tso, s);
    ACQ_ASSERT(s -> current_value == trec);
435
436
437
438
439
440
441
442
443
444
445
    nq = q -> next_queue_entry;
    pq = q -> prev_queue_entry;
    if (nq != END_STM_WAIT_QUEUE) {
      nq -> prev_queue_entry = pq;
    }
    if (pq != END_STM_WAIT_QUEUE) {
      pq -> next_queue_entry = nq;
    } else {
      ASSERT (s -> first_wait_queue_entry == q);
      s -> first_wait_queue_entry = nq;
    }
446
    unlock_tvar(trec, s, saw, FALSE);
447
448
449
450
451
  });
}
 
/*......................................................................*/
 
452
453
static TRecEntry *get_new_entry(StgRegTable *reg,
                                StgTRecHeader *t) {
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
  TRecEntry *result;
  StgTRecChunk *c;
  int i;

  c = t -> current_chunk;
  i = c -> next_entry_idx;
  ASSERT(c != END_STM_CHUNK_LIST);

  if (i < TREC_CHUNK_NUM_ENTRIES) {
    // Continue to use current chunk
    result = &(c -> entries[i]);
    c -> next_entry_idx ++;
  } else {
    // Current chunk is full: allocate a fresh one
    StgTRecChunk *nc;
469
    nc = new_stg_trec_chunk(reg);
470
471
472
473
474
475
476
477
478
479
480
    nc -> prev_chunk = c;
    nc -> next_entry_idx = 1;
    t -> current_chunk = nc;
    result = &(nc -> entries[0]);
  }

  return result;
}

/*......................................................................*/

481
482
static void merge_update_into(StgRegTable *reg,
                              StgTRecHeader *t,
483
484
                              StgTVar *tvar,
                              StgClosure *expected_value,
485
                              StgClosure *new_value) {
486
487
488
489
490
491
492
493
494
  int found;
  
  // Look for an entry in this trec
  found = FALSE;
  FOR_EACH_ENTRY(t, e, {
    StgTVar *s;
    s = e -> tvar;
    if (s == tvar) {
      found = TRUE;
495
496
497
498
499
500
      if (e -> expected_value != expected_value) {
        // Must abort if the two entries start from different values
        TRACE("%p : entries inconsistent at %p (%p vs %p)\n", 
              t, tvar, e -> expected_value, expected_value);
        t -> state = TREC_CONDEMNED;
      } 
501
502
503
504
505
506
507
508
      e -> new_value = new_value;
      BREAK_FOR_EACH;
    }
  });

  if (!found) {
    // No entry so far in this trec
    TRecEntry *ne;
509
    ne = get_new_entry(reg, t);
510
511
512
513
514
515
516
517
    ne -> tvar = tvar;
    ne -> expected_value = expected_value;
    ne -> new_value = new_value;
  }
}

/*......................................................................*/

518
519
520
521
522
static StgBool entry_is_update(TRecEntry *e) {
  StgBool result;
  result = (e -> expected_value != e -> new_value);
  return result;
} 
523

524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
static StgBool entry_is_read_only(TRecEntry *e) {
  StgBool result;
  result = (e -> expected_value == e -> new_value);
  return result;
} 

static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
  StgClosure *c;
  StgBool result;
  c = s -> current_value;
  result = (c == (StgClosure *) h);
  return result;  
}

// revert_ownership : release a lock on a TVar, storing back
// the value that it held when the lock was acquired.  "revert_all"
// is set in stmWait and stmReWait when we acquired locks on all of 
// the TVars involved.  "revert_all" is not set in commit operations
// where we don't lock TVars that have been read from but not updated.

static void revert_ownership(StgTRecHeader *trec STG_UNUSED,
                             StgBool revert_all STG_UNUSED) {
#if defined(STM_FG_LOCKS) 
  FOR_EACH_ENTRY(trec, e, {
    if (revert_all || entry_is_update(e)) {
549
550
      StgTVar *s;
      s = e -> tvar;
551
552
      if (tvar_is_locked(s, trec)) {
        unlock_tvar(trec, s, e -> expected_value, TRUE);
553
      }
554
555
556
    }
  });
#endif
557
}
558

559
560
/*......................................................................*/

561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
// validate_and_acquire_ownership : this performs the twin functions
// of checking that the TVars referred to by entries in trec hold the
// expected values and:
// 
//   - locking the TVar (on updated TVars during commit, or all TVars
//     during wait)
//
//   - recording the identity of the TRec who wrote the value seen in the
//     TVar (on non-updated TVars during commit).  These values are 
//     stashed in the TRec entries and are then checked in check_read_only
//     to ensure that an atomic snapshot of all of these locations has been
//     seen.

static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, 
                                               int acquire_all,
                                               int retain_ownership) {
  StgBool result;
578
579

  if (shake()) {
580
    TRACE("%p : shake, pretending trec is invalid when it may not be\n", trec);
581
582
583
    return FALSE;
  }

584
585
586
587
  ASSERT ((trec -> state == TREC_ACTIVE) || 
	  (trec -> state == TREC_WAITING) ||
	  (trec -> state == TREC_CONDEMNED));
  result = !((trec -> state) == TREC_CONDEMNED);
588
  if (result) {
589
    FOR_EACH_ENTRY(trec, e, {
590
591
      StgTVar *s;
      s = e -> tvar;
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
      if (acquire_all || entry_is_update(e)) {
        TRACE("%p : trying to acquire %p\n", trec, s);
        if (!cond_lock_tvar(trec, s, e -> expected_value)) {
          TRACE("%p : failed to acquire %p\n", trec, s);
          result = FALSE;
          BREAK_FOR_EACH;
        }
      } else {
        TRACE("%p : will need to check %p\n", trec, s);
        if (s -> current_value != e -> expected_value) {
          TRACE("%p : doesn't match\n", trec);
          result = FALSE;
          BREAK_FOR_EACH;
        }
        e -> saw_update_by = s -> last_update_by;
        if (s -> current_value != e -> expected_value) {
          TRACE("%p : doesn't match (race)\n", trec);
          result = FALSE;
          BREAK_FOR_EACH;
        } else {
          TRACE("%p : need to check update by %p\n", trec, e -> saw_update_by);
        }
614
615
616
      }
    });
  }
617
618
619
620
621

  if ((!result) || (!retain_ownership)) {
    revert_ownership(trec, acquire_all);
  }
  
622
623
624
  return result;
}

625
626
627
628
629
630
631
632
633
634
// check_read_only : check that we've seen an atomic snapshot of the
// non-updated TVars accessed by a trec.  This checks that the last TRec to
// commit an update to the TVar is unchanged since the value was stashed in
// validate_and_acquire_ownership.  If no udpate is seen to any TVar than
// all of them contained their expected values at the start of the call to
// check_read_only.
//
// The paper "Concurrent programming without locks" (under submission), or
// Keir Fraser's PhD dissertation "Practical lock-free programming" discuss
// this kind of algorithm.
635

636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
static StgBool check_read_only(StgTRecHeader *trec) {
  StgBool result = TRUE;

  FOR_EACH_ENTRY(trec, e, {
    StgTVar *s;
    s = e -> tvar;
    if (entry_is_read_only(e)) {
      TRACE("%p : check_read_only for TVar %p, saw %p\n", trec, s, e -> saw_update_by);
      if (s -> last_update_by != e -> saw_update_by) {
        // ||s -> current_value != e -> expected_value) {
        TRACE("%p : mismatch\n", trec);
        result = FALSE;
        BREAK_FOR_EACH;
      }
    }
  });

  return result;
}
655
656
657
658
659


/************************************************************************/

void stmPreGCHook() {
660
  lock_stm(NO_TREC);
661
  TRACE("stmPreGCHook\n");
662
  unlock_stm(NO_TREC);
663
664
665
666
667
668
669
670
671
672
}

/************************************************************************/

void initSTM() {
  TRACE("initSTM, NO_TREC=%p\n", NO_TREC);
}

/*......................................................................*/

673
674
StgTRecHeader *stmStartTransaction(StgRegTable *reg,
                                   StgTRecHeader *outer) {
675
  StgTRecHeader *t;
676
677
678
  TRACE("%p : stmStartTransaction\n", outer);
  t = new_stg_trec_header(reg, outer);
  TRACE("%p : stmStartTransaction()=%p\n", outer, t);
679
680
681
682
683
684
  return t;
}

/*......................................................................*/

void stmAbortTransaction(StgTRecHeader *trec) {
685
  TRACE("%p : stmAbortTransaction\n", trec);
686
687
688
  ASSERT (trec != NO_TREC);
  ASSERT ((trec -> state == TREC_ACTIVE) || 
          (trec -> state == TREC_WAITING) ||
689
690
691
          (trec -> state == TREC_CONDEMNED));

  lock_stm(trec);
692
693
  if (trec -> state == TREC_WAITING) {
    ASSERT (trec -> enclosing_trec == NO_TREC);
694
695
    TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec);
    remove_wait_queue_entries_for_trec(trec);
696
697
  } 
  trec -> state = TREC_ABORTED;
698
  unlock_stm(trec);
699

700
  TRACE("%p : stmAbortTransaction done\n", trec);
701
702
703
704
}

/*......................................................................*/

705
void stmCondemnTransaction(StgTRecHeader *trec) {
706
  TRACE("%p : stmCondemnTransaction\n", trec);
707
708
709
  ASSERT (trec != NO_TREC);
  ASSERT ((trec -> state == TREC_ACTIVE) || 
          (trec -> state == TREC_WAITING) ||
710
          (trec -> state == TREC_CONDEMNED));
711

712
  lock_stm(trec);
713
714
  if (trec -> state == TREC_WAITING) {
    ASSERT (trec -> enclosing_trec == NO_TREC);
715
716
    TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec);
    remove_wait_queue_entries_for_trec(trec);
717
  } 
718
719
  trec -> state = TREC_CONDEMNED;
  unlock_stm(trec);
720

721
  TRACE("%p : stmCondemnTransaction done\n", trec);
722
723
724
725
}

/*......................................................................*/

726
727
StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
  StgTRecHeader *outer;
728
  TRACE("%p : stmGetEnclosingTRec\n", trec);
729
  outer = trec -> enclosing_trec;
730
  TRACE("%p : stmGetEnclosingTRec()=%p\n", trec, outer);
731
732
733
734
735
  return outer;
}

/*......................................................................*/

736
737
738
739
740
StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
  StgTRecHeader *t;
  StgBool result;

  TRACE("%p : stmValidateNestOfTransactions\n", trec);
741
742
  ASSERT(trec != NO_TREC);
  ASSERT((trec -> state == TREC_ACTIVE) || 
743
744
745
746
         (trec -> state == TREC_WAITING) ||
         (trec -> state == TREC_CONDEMNED));

  lock_stm(trec);
747

748
749
750
751
752
753
  t = trec;
  result = TRUE;
  while (t != NO_TREC) {
    result &= validate_and_acquire_ownership(t, TRUE, FALSE);
    t = t -> enclosing_trec;
  }
754
755

  if (!result && trec -> state != TREC_WAITING) {
756
    trec -> state = TREC_CONDEMNED; 
757
758
  }

759
  unlock_stm(trec);
760

761
  TRACE("%p : stmValidateNestOfTransactions()=%d\n", trec, result);
762
763
764
765
766
  return result;
}

/*......................................................................*/

767
StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) {
768
  int result;
769
  TRACE("%p : stmCommitTransaction()\n", trec);
770
  ASSERT (trec != NO_TREC);
771
  ASSERT (trec -> enclosing_trec == NO_TREC);
772
  ASSERT ((trec -> state == TREC_ACTIVE) || 
773
          (trec -> state == TREC_CONDEMNED));
774

775
776
  lock_stm(trec);
  result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
777
  if (result) {
778
779
780
781
782
783
784
785
786
787
788
789
    // We now know that all the updated locations hold their expected values.
    ASSERT (trec -> state == TREC_ACTIVE);

    if (use_read_phase) {
      TRACE("%p : doing read check\n", trec);
      result = check_read_only(trec);
    }
    
    if (result) {
      // We now know that all of the read-only locations held their exepcted values
      // at the end of the call to validate_and_acquire_ownership.  This forms the
      // linearization point of the commit.
790
      
791
      TRACE("%p : read-check succeeded\n", trec);
792
793
794
      FOR_EACH_ENTRY(trec, e, {
        StgTVar *s;
        s = e -> tvar;
795
796
797
798
799
800
        if (e -> new_value != e -> expected_value) {
          // Entry is an update: write the value back to the TVar, unlocking it if
          // necessary.

          ACQ_ASSERT(tvar_is_locked(s, trec));
          TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s);
801
          unpark_waiters_on(s);
802
803
804
805
          s -> last_update_by = trec;
          unlock_tvar(trec, s, e -> new_value, TRUE);
        } 
        ACQ_ASSERT(!tvar_is_locked(s, trec));
806
      });
807
808
    } else {
      revert_ownership(trec, FALSE);
809
810
811
    }
  } 

812
  unlock_stm(trec);
813

814
  TRACE("%p : stmCommitTransaction()=%d\n", trec, result);
815
816
817
818
819
820

  return result;
}

/*......................................................................*/

821
822
StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) {
  StgTRecHeader *et;
823
  int result;
824
825
826
827
828
829
830
831
  ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
  TRACE("%p : stmCommitNestedTransaction() into %p\n", trec, trec -> enclosing_trec);
  ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));

  lock_stm(trec);

  et = trec -> enclosing_trec;
  result = validate_and_acquire_ownership(trec, FALSE, TRUE);
832
  if (result) {
833
834
835
836
837
838
    // We now know that all the updated locations hold their expected values.

    if (use_read_phase) {
      TRACE("%p : doing read check\n", trec);
      result = check_read_only(trec);
    }
839
    if (result) {
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
      // We now know that all of the read-only locations held their exepcted values
      // at the end of the call to validate_and_acquire_ownership.  This forms the
      // linearization point of the commit.

      if (result) {
        TRACE("%p : read-check succeeded\n", trec);
        FOR_EACH_ENTRY(trec, e, {
          // Merge each entry into the enclosing transaction record, release all
          // locks.

          StgTVar *s;
          s = e -> tvar;
          if (entry_is_update(e)) {
            unlock_tvar(trec, s, e -> expected_value, FALSE);
          }
          merge_update_into(reg, et, s, e -> expected_value, e -> new_value);
          ACQ_ASSERT(s -> current_value != trec);
        });
      } else {
        revert_ownership(trec, FALSE);
      }
    }
  } 
863

864
  unlock_stm(trec);
865

866
  TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result);
867
868
869
870
871
872

  return result;
}

/*......................................................................*/

873
StgBool stmWait(StgRegTable *reg, StgTSO *tso, StgTRecHeader *trec) {
874
  int result;
875
  TRACE("%p : stmWait(%p)\n", trec, tso);
876
877
878
  ASSERT (trec != NO_TREC);
  ASSERT (trec -> enclosing_trec == NO_TREC);
  ASSERT ((trec -> state == TREC_ACTIVE) || 
879
          (trec -> state == TREC_CONDEMNED));
880

881
882
  lock_stm(trec);
  result = validate_and_acquire_ownership(trec, TRUE, TRUE);
883
884
885
886
  if (result) {
    // The transaction is valid so far so we can actually start waiting.
    // (Otherwise the transaction was not valid and the thread will have to
    // retry it).
887
888
889
890
891

    // Put ourselves to sleep.  We retain locks on all the TVars involved
    // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
    // in the TSO, (c) TREC_WAITING in the Trec.  
    build_wait_queue_entries_for_trec(reg, tso, trec);
892
893
894
    park_tso(tso);
    trec -> state = TREC_WAITING;

895
896
897
898
899
900
901
902
903
    // As soon as we start releasing ownership, another thread may find us 
    // and wake us up.  This may happen even before we have finished 
    // releasing ownership.
    revert_ownership(trec, TRUE);
  }  

  unlock_stm(trec);

  TRACE("%p : stmWait(%p)=%d\n", trec, tso, result);
904
905
906
907
908
  return result;
}

/*......................................................................*/

909
StgBool stmReWait(StgTSO *tso) {
910
  int result;
911
912
  StgTRecHeader *trec = tso->trec;

913
  TRACE("%p : stmReWait\n", trec);
914
915
  ASSERT (trec != NO_TREC);
  ASSERT (trec -> enclosing_trec == NO_TREC);
916
  ASSERT ((trec -> state == TREC_WAITING) || 
917
          (trec -> state == TREC_CONDEMNED));
918

919
920
921
  lock_stm(trec);
  result = validate_and_acquire_ownership(trec, TRUE, TRUE);
  TRACE("%p : validation %s\n", trec, result ? "succeeded" : "failed");
922
923
924
925
  if (result) {
    // The transaction remains valid -- do nothing because it is already on
    // the wait queues
    ASSERT (trec -> state == TREC_WAITING);
926
    park_tso(tso);
927
    revert_ownership(trec, TRUE);
928
929
930
  } else {
    // The transcation has become invalid.  We can now remove it from the wait
    // queues.
931
932
    if (trec -> state != TREC_CONDEMNED) {
      remove_wait_queue_entries_for_trec (trec);
933
    }
934
935

  }
936
  unlock_stm(trec);
937

938
  TRACE("%p : stmReWait()=%d\n", trec, result);
939
940
941
942
943
  return result;
}

/*......................................................................*/

944
945
static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
  TRecEntry *result = NULL;
946

947
948
  TRACE("%p : get_entry_for TVar %p\n", trec, tvar);
  ASSERT(trec != NO_TREC);
949

950
951
  do {
    FOR_EACH_ENTRY(trec, e, {
952
      if (e -> tvar == tvar) {
953
954
955
956
        result = e;
        if (in != NULL) {
          *in = trec;
        }
957
958
959
        BREAK_FOR_EACH;
      }
    });
960
961
    trec = trec -> enclosing_trec;
  } while (result == NULL && trec != NO_TREC);
962

963
964
965
966
967
968
969
970
971
972
  return result;    
}

static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
  StgClosure *result;
  result = tvar -> current_value;

#if defined(STM_FG_LOCKS)
  while (GET_INFO(result) == &stg_TREC_HEADER_info) {
    TRACE("%p : read_current_value(%p) saw %p\n", trec, tvar, result);
973
974
    result = tvar -> current_value;
  }
975
#endif
976

977
978
979
  TRACE("%p : read_current_value(%p)=%p\n", trec, tvar, result);
  return result;
}
980

981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
/*......................................................................*/

StgClosure *stmReadTVar(StgRegTable *reg,
                        StgTRecHeader *trec, 
			StgTVar *tvar) {
  StgTRecHeader *entry_in;
  StgClosure *result = NULL;
  TRecEntry *entry = NULL;
  TRACE("%p : stmReadTVar(%p)\n", trec, tvar);
  ASSERT (trec != NO_TREC);
  ASSERT (trec -> state == TREC_ACTIVE || 
          trec -> state == TREC_CONDEMNED);

  entry = get_entry_for(trec, tvar, &entry_in);

  if (entry != NULL) {
    if (entry_in == trec) {
      // Entry found in our trec
      result = entry -> new_value;
    } else {
      // Entry found in another trec
      TRecEntry *new_entry = get_new_entry(reg, trec);
      new_entry -> tvar = tvar;
      new_entry -> expected_value = entry -> expected_value;
      new_entry -> new_value = entry -> new_value;
      result = new_entry -> new_value;
    } 
  } else {
    // No entry found
    StgClosure *current_value = read_current_value(trec, tvar);
    TRecEntry *new_entry = get_new_entry(reg, trec);
    new_entry -> tvar = tvar;
    new_entry -> expected_value = current_value;
    new_entry -> new_value = current_value;
    result = current_value;
  }

  TRACE("%p : stmReadTVar(%p)=%p\n", trec, tvar, result);
1019
1020
1021
1022
1023
  return result;
}

/*......................................................................*/

1024
1025
void stmWriteTVar(StgRegTable *reg,
                  StgTRecHeader *trec,
1026
1027
		  StgTVar *tvar, 
		  StgClosure *new_value) {
1028
1029

  StgTRecHeader *entry_in;
1030
  TRecEntry *entry = NULL;
1031
  TRACE("%p : stmWriteTVar(%p, %p)\n", trec, tvar, new_value);
1032
1033
  ASSERT (trec != NO_TREC);
  ASSERT (trec -> state == TREC_ACTIVE || 
1034
          trec -> state == TREC_CONDEMNED);
1035

1036
  entry = get_entry_for(trec, tvar, &entry_in);
1037

1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
  if (entry != NULL) {
    if (entry_in == trec) {
      // Entry found in our trec
      entry -> new_value = new_value;
    } else {
      // Entry found in another trec
      TRecEntry *new_entry = get_new_entry(reg, trec);
      new_entry -> tvar = tvar;
      new_entry -> expected_value = entry -> expected_value;
      new_entry -> new_value = new_value;
    } 
1049
1050
  } else {
    // No entry found
1051
1052
1053
1054
1055
    StgClosure *current_value = read_current_value(trec, tvar);
    TRecEntry *new_entry = get_new_entry(reg, trec);
    new_entry -> tvar = tvar;
    new_entry -> expected_value = current_value;
    new_entry -> new_value = new_value;
1056
1057
  }

1058
  TRACE("%p : stmWriteTVar done\n", trec);
1059
1060
1061
1062
}

/*......................................................................*/