STM.c 34.5 KB
Newer Older
1
2
/* -----------------------------------------------------------------------------
 *
3
 * (c) The GHC Team 1998-2005
4
5
6
 * 
 * STM implementation.
 *
7
8
 * Overview
 * --------
9
 *
10
11
12
13
14
15
 * See the PPoPP 2005 paper "Composable memory transactions".  In summary, 
 * each transcation has a TRec (transaction record) holding entries for each of the
 * TVars (transactional variables) that it has accessed.  Each entry records
 * (a) the TVar, (b) the expected value seen in the TVar, (c) the new value that
 * the transaction wants to write to the TVar, (d) during commit, the identity of
 * the TRec that wrote the expected value.  
16
 *
17
18
19
20
21
22
23
 * Separate TRecs are used for each level in a nest of transactions.  This allows
 * a nested transaction to be aborted without condemning its enclosing transactions.
 * This is needed in the implementation of catchRetry.  Note that the "expected value"
 * in a nested transaction's TRec is the value expected to be *held in memory* if
 * the transaction commits -- not the "new value" stored in one of the enclosing
 * transactions.  This means that validation can be done without searching through
 * a nest of TRecs.
24
 *
25
26
 * Concurrency control
 * -------------------
27
 *
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
 * Three different concurrency control schemes can be built according to the settings
 * in STM.h:
 * 
 * STM_UNIPROC assumes that the caller serialises invocations on the STM interface.
 * In the Haskell RTS this means it is suitable only for non-SMP builds.
 *
 * STM_CG_LOCK uses coarse-grained locking -- a single 'stm lock' is acquired during
 * an invocation on the STM interface.  Note that this does not mean that 
 * transactions are simply serialized -- the lock is only held *within* the 
 * implementation of stmCommitTransaction, stmWait etc.
 *
 * STM_FG_LOCKS uses fine-grained locking -- locking is done on a per-TVar basis
 * and, when committing a transaction, no locks are acquired for TVars that have
 * been read but not updated.
 *
 * Concurrency control is implemented in the functions:
 *
 *    lock_stm
 *    unlock_stm
 *    lock_tvar / cond_lock_tvar
 *    unlock_tvar
 *
 * The choice between STM_UNIPROC / STM_CG_LOCK / STM_FG_LOCKS affects the 
 * implementation of these functions.  
 *
 * lock_stm & unlock_stm are straightforward : they acquire a simple spin-lock
 * using STM_CG_LOCK, and otherwise they are no-ops.
55
 *
56
57
58
59
60
 * lock_tvar / cond_lock_tvar and unlock_tvar are more complex because they 
 * have other effects (present in STM_UNIPROC and STM_CG_LOCK builds) as well
 * as the actual business of maniupultaing a lock (present only in STM_FG_LOCKS
 * builds).  This is because locking a TVar is implemented by writing the lock
 * holder's TRec into the TVar's current_value field:
61
 *
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
 *   lock_tvar - lock a specified TVar (STM_FG_LOCKS only), returning the value 
 *               it contained.
 *
 *   cond_lock_tvar - lock a specified TVar (STM_FG_LOCKS only) if it 
 *               contains a specified value.  Return TRUE if this succeeds,
 *               FALSE otherwise.
 *
 *   unlock_tvar - release the lock on a specified TVar (STM_FG_LOCKS only),
 *               storing a specified value in place of the lock entry.
 *
 * Using these operations, the typcial pattern of a commit/validate/wait operation
 * is to (a) lock the STM, (b) lock all the TVars being updated, (c) check that 
 * the TVars that were only read from still contain their expected values, 
 * (d) release the locks on the TVars, writing updates to them in the case of a 
 * commit, (e) unlock the STM.
 *
 * Queues of waiting threads hang off the first_wait_queue_entry field of each
 * TVar.  This may only be manipulated when holding that TVar's lock.  In
 * particular, when a thread is putting itself to sleep, it mustn't release
 * the TVar's lock until it has added itself to the wait queue and marked its
 * TSO as BlockedOnSTM -- this makes sure that other threads will know to wake it.
83
84
85
86
87
88
89
90
 *
 * ---------------------------------------------------------------------------*/

#include "PosixSource.h"
#include "Rts.h"
#include "RtsFlags.h"
#include "RtsUtils.h"
#include "Schedule.h"
91
#include "SMP.h"
92
93
94
95
96
97
#include "STM.h"
#include "Storage.h"

#include <stdlib.h>
#include <stdio.h>

98
#define TRUE 1
99
#define FALSE 0
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115

// ACQ_ASSERT is used for assertions which are only required for SMP builds with
// fine-grained locking. 

#if defined(STM_FG_LOCKS)
#define ACQ_ASSERT(_X) ASSERT(_X)
#define NACQ_ASSERT(_X) /*Nothing*/
#else
#define ACQ_ASSERT(_X) /*Nothing*/
#define NACQ_ASSERT(_X) ASSERT(_X)
#endif

/*......................................................................*/

// If SHAKE is defined then validation will sometime spuriously fail.  They helps test
// unusualy code paths if genuine contention is rare
116
117
118
119
120
121
122
123
124
125
126
127
128
129

#if defined(DEBUG)
#define SHAKE
#define TRACE(_x...) IF_DEBUG(stm, debugBelch ( _x ))
#else
#define TRACE(_x...) /*Nothing*/
#endif

#ifdef SHAKE
static const int do_shake = TRUE;
#else
static const int do_shake = FALSE;
#endif
static int shake_ctr = 0;
130
static int shake_lim = 1;
131
132
133

static int shake(void) {
  if (do_shake) {
134
135
136
    if (((shake_ctr++) % shake_lim) == 0) {
      shake_ctr = 1;
      shake_lim ++;
137
138
139
140
141
142
143
144
145
146
147
148
149
      return TRUE;
    } 
    return FALSE;
  } else {
    return FALSE;
  }
}

/*......................................................................*/

// Helper macros for iterating over entries within a transaction
// record

150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
#define FOR_EACH_ENTRY(_t,_x,CODE) do {                                         \
  StgTRecHeader *__t = (_t);                                                    \
  StgTRecChunk *__c = __t -> current_chunk;                                     \
  StgWord __limit = __c -> next_entry_idx;                                      \
  TRACE("%p : FOR_EACH_ENTRY, current_chunk=%p limit=%ld\n", __t, __c, __limit); \
  while (__c != END_STM_CHUNK_LIST) {                                           \
    StgWord __i;                                                                \
    for (__i = 0; __i < __limit; __i ++) {                                      \
      TRecEntry *_x = &(__c -> entries[__i]);                                   \
      do { CODE } while (0);                                                    \
    }                                                                           \
    __c = __c -> prev_chunk;                                                    \
    __limit = TREC_CHUNK_NUM_ENTRIES;                                           \
  }                                                                             \
 exit_for_each:                                                                 \
  if (FALSE) goto exit_for_each;                                                \
166
167
168
169
170
171
} while (0)

#define BREAK_FOR_EACH goto exit_for_each
     
/*......................................................................*/

172
173
174
175
#define IF_STM_UNIPROC(__X)  do { } while (0)
#define IF_STM_CG_LOCK(__X)  do { } while (0)
#define IF_STM_FG_LOCKS(__X) do { } while (0)

176
#if defined(STM_UNIPROC)
177
178
#undef IF_STM_UNIPROC
#define IF_STM_UNIPROC(__X)  do { __X } while (0)
179
static const StgBool use_read_phase = FALSE;
180

181
182
183
static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
  TRACE("%p : lock_stm()\n", trec);
}
184

185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
  TRACE("%p : unlock_stm()\n", trec);
}

static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
                             StgTVar *s STG_UNUSED) {
  StgClosure *result;
  TRACE("%p : lock_tvar(%p)\n", trec, s);
  result = s -> current_value;
  return result;
}

static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
                        StgTVar *s STG_UNUSED,
                        StgClosure *c,
                        StgBool force_update) {
  TRACE("%p : unlock_tvar(%p)\n", trec, s);
  if (force_update) {
    s -> current_value = c;
204
  }
205
}
206

207
208
209
210
211
212
static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
                              StgTVar *s STG_UNUSED,
                              StgClosure *expected) {
  StgClosure *result;
  TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
  result = s -> current_value;
213
  TRACE("%p : %s\n", trec, (result == expected) ? "success" : "failure");
214
215
  return (result == expected);
}
216
#endif
217
218
219

#if defined(STM_CG_LOCK) /*........................................*/

220
221
#undef IF_STM_CG_LOCK
#define IF_STM_CG_LOCK(__X)  do { __X } while (0)
222
223
224
225
226
227
static const StgBool use_read_phase = FALSE;
static volatile StgTRecHeader *smp_locked = NULL;

static void lock_stm(StgTRecHeader *trec) {
  while (cas(&smp_locked, NULL, trec) != NULL) { }
  TRACE("%p : lock_stm()\n", trec);
228
229
}

230
231
232
233
234
static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
  TRACE("%p : unlock_stm()\n", trec);
  ASSERT (smp_locked == trec);
  smp_locked = 0;
}
235

236
237
238
239
240
241
242
243
static StgClosure *lock_tvar(StgTRecHeader *trec STG_UNUSED, 
                             StgTVar *s STG_UNUSED) {
  StgClosure *result;
  TRACE("%p : lock_tvar(%p)\n", trec, s);
  ASSERT (smp_locked == trec);
  result = s -> current_value;
  return result;
}
244

245
246
247
248
249
250
251
252
static void *unlock_tvar(StgTRecHeader *trec STG_UNUSED,
                         StgTVar *s STG_UNUSED,
                         StgClosure *c,
                         StgBool force_update) {
  TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
  ASSERT (smp_locked == trec);
  if (force_update) {
    s -> current_value = c;
253
  }
254
255
256
257
258
259
260
261
262
263
264
265
}

static StgBool cond_lock_tvar(StgTRecHeader *trec STG_UNUSED, 
                               StgTVar *s STG_UNUSED,
                               StgClosure *expected) {
  StgClosure *result;
  TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
  ASSERT (smp_locked == trec);
  result = s -> current_value;
  TRACE("%p : %d\n", result ? "success" : "failure");
  return (result == expected);
}
266
#endif
267
268
269

#if defined(STM_FG_LOCKS) /*...................................*/

270
271
#undef IF_STM_FG_LOCKS
#define IF_STM_FG_LOCKS(__X) do { __X } while (0)
272
273
274
275
static const StgBool use_read_phase = TRUE;

static void lock_stm(StgTRecHeader *trec STG_UNUSED) {
  TRACE("%p : lock_stm()\n", trec);
276
277
}

278
279
280
static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
  TRACE("%p : unlock_stm()\n", trec);
}
281

282
283
284
285
286
287
288
289
290
291
292
static StgClosure *lock_tvar(StgTRecHeader *trec, 
                             StgTVar *s STG_UNUSED) {
  StgClosure *result;
  TRACE("%p : lock_tvar(%p)\n", trec, s);
  do {
    do {
      result = s -> current_value;
    } while (GET_INFO(result) == &stg_TREC_HEADER_info);
  } while (cas(&(s -> current_value), result, trec) != result);
  return result;
}
293

294
295
296
297
298
299
300
static void unlock_tvar(StgTRecHeader *trec STG_UNUSED,
                        StgTVar *s,
                        StgClosure *c,
                        StgBool force_update STG_UNUSED) {
  TRACE("%p : unlock_tvar(%p, %p)\n", trec, s, c);
  ASSERT(s -> current_value == trec);
  s -> current_value = c;
301
302
}

303
304
305
306
307
308
309
310
static StgBool cond_lock_tvar(StgTRecHeader *trec, 
                              StgTVar *s,
                              StgClosure *expected) {
  StgClosure *result;
  TRACE("%p : cond_lock_tvar(%p, %p)\n", trec, s, expected);
  result = cas(&(s -> current_value), expected, trec);
  TRACE("%p : %s\n", trec, result ? "success" : "failure");
  return (result == expected);
311
}
312
#endif
313
314
315
316
317
318
319
320
321
322
323
324

/*......................................................................*/

// Helper functions for thread blocking and unblocking

static void park_tso(StgTSO *tso) {
  ASSERT(tso -> why_blocked == NotBlocked);
  tso -> why_blocked = BlockedOnSTM;
  tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE;
  TRACE("park_tso on tso=%p\n", tso);
}

325
static void unpark_tso(Capability *cap, StgTSO *tso) {
326
327
328
329
330
  // We will continue unparking threads while they remain on one of the wait
  // queues: it's up to the thread itself to remove it from the wait queues
  // if it decides to do so when it is scheduled.
  if (tso -> why_blocked == BlockedOnSTM) {
    TRACE("unpark_tso on tso=%p\n", tso);
331
    unblockOne(cap,tso);
332
333
334
335
336
  } else {
    TRACE("spurious unpark_tso on tso=%p\n", tso);
  }
}

337
static void unpark_waiters_on(Capability *cap, StgTVar *s) {
338
339
340
341
342
  StgTVarWaitQueue *q;
  TRACE("unpark_waiters_on tvar=%p\n", s);
  for (q = s -> first_wait_queue_entry; 
       q != END_STM_WAIT_QUEUE; 
       q = q -> next_queue_entry) {
343
    unpark_tso(cap, q -> waiting_tso);
344
345
346
347
348
349
350
  }
}

/*......................................................................*/

// Helper functions for allocation and initialization

351
static StgTVarWaitQueue *new_stg_tvar_wait_queue(Capability *cap,
352
                                                 StgTSO *waiting_tso) {
353
  StgTVarWaitQueue *result;
354
  result = (StgTVarWaitQueue *)allocateLocal(cap, sizeofW(StgTVarWaitQueue));
355
  SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM);
356
357
358
359
  result -> waiting_tso = waiting_tso;
  return result;
}

360
static StgTRecChunk *new_stg_trec_chunk(Capability *cap) {
361
  StgTRecChunk *result;
362
  result = (StgTRecChunk *)allocateLocal(cap, sizeofW(StgTRecChunk));
363
  SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM);
364
365
366
367
368
  result -> prev_chunk = END_STM_CHUNK_LIST;
  result -> next_entry_idx = 0;
  return result;
}

369
static StgTRecHeader *new_stg_trec_header(Capability *cap,
370
                                          StgTRecHeader *enclosing_trec) {
371
  StgTRecHeader *result;
372
  result = (StgTRecHeader *) allocateLocal(cap, sizeofW(StgTRecHeader));
373
374
  SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM);

375
  result -> enclosing_trec = enclosing_trec;
376
  result -> current_chunk = new_stg_trec_chunk(cap);
377
378
379
380
381

  if (enclosing_trec == NO_TREC) {
    result -> state = TREC_ACTIVE;
  } else {
    ASSERT(enclosing_trec -> state == TREC_ACTIVE ||
382
           enclosing_trec -> state == TREC_CONDEMNED);
383
384
385
386
387
388
    result -> state = enclosing_trec -> state;
  }

  return result;  
}

389
static StgTVar *new_tvar(Capability *cap,
390
391
                         StgClosure *new_value) {
  StgTVar *result;
392
  result = (StgTVar *)allocateLocal(cap, sizeofW(StgTVar));
393
394
395
396
397
398
399
400
401
  SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM);
  result -> current_value = new_value;
  result -> first_wait_queue_entry = END_STM_WAIT_QUEUE;
#if defined(SMP)
  result -> last_update_by = NO_TREC;
#endif
  return result;
}

402
403
404
405
/*......................................................................*/

// Helper functions for managing waiting lists

406
static void build_wait_queue_entries_for_trec(Capability *cap,
407
408
                                      StgTSO *tso, 
                                      StgTRecHeader *trec) {
409
410
  ASSERT(trec != NO_TREC);
  ASSERT(trec -> enclosing_trec == NO_TREC);
411
412
413
414
  ASSERT(trec -> state == TREC_ACTIVE);

  TRACE("%p : build_wait_queue_entries_for_trec()\n", trec);

415
416
417
418
419
  FOR_EACH_ENTRY(trec, e, {
    StgTVar *s;
    StgTVarWaitQueue *q;
    StgTVarWaitQueue *fq;
    s = e -> tvar;
420
421
422
    TRACE("%p : adding tso=%p to wait queue for tvar=%p\n", trec, tso, s);
    ACQ_ASSERT(s -> current_value == trec);
    NACQ_ASSERT(s -> current_value == e -> expected_value);
423
    fq = s -> first_wait_queue_entry;
424
    q = new_stg_tvar_wait_queue(cap, tso);
425
426
427
428
429
430
431
432
433
434
    q -> next_queue_entry = fq;
    q -> prev_queue_entry = END_STM_WAIT_QUEUE;
    if (fq != END_STM_WAIT_QUEUE) {
      fq -> prev_queue_entry = q;
    }
    s -> first_wait_queue_entry = q;
    e -> new_value = (StgClosure *) q;
  });
}

435
static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) {
436
437
  ASSERT(trec != NO_TREC);
  ASSERT(trec -> enclosing_trec == NO_TREC);
438
  ASSERT(trec -> state == TREC_WAITING ||
439
440
441
442
         trec -> state == TREC_CONDEMNED);

  TRACE("%p : remove_wait_queue_entries_for_trec()\n", trec);

443
444
445
446
447
448
  FOR_EACH_ENTRY(trec, e, {
    StgTVar *s;
    StgTVarWaitQueue *pq;
    StgTVarWaitQueue *nq;
    StgTVarWaitQueue *q;
    s = e -> tvar;
449
    StgClosure *saw = lock_tvar(trec, s);
450
    q = (StgTVarWaitQueue *) (e -> new_value);
451
452
    TRACE("%p : removing tso=%p from wait queue for tvar=%p\n", trec, q -> waiting_tso, s);
    ACQ_ASSERT(s -> current_value == trec);
453
454
455
456
457
458
459
460
461
462
463
    nq = q -> next_queue_entry;
    pq = q -> prev_queue_entry;
    if (nq != END_STM_WAIT_QUEUE) {
      nq -> prev_queue_entry = pq;
    }
    if (pq != END_STM_WAIT_QUEUE) {
      pq -> next_queue_entry = nq;
    } else {
      ASSERT (s -> first_wait_queue_entry == q);
      s -> first_wait_queue_entry = nq;
    }
464
    unlock_tvar(trec, s, saw, FALSE);
465
466
467
468
469
  });
}
 
/*......................................................................*/
 
470
static TRecEntry *get_new_entry(Capability *cap,
471
                                StgTRecHeader *t) {
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
  TRecEntry *result;
  StgTRecChunk *c;
  int i;

  c = t -> current_chunk;
  i = c -> next_entry_idx;
  ASSERT(c != END_STM_CHUNK_LIST);

  if (i < TREC_CHUNK_NUM_ENTRIES) {
    // Continue to use current chunk
    result = &(c -> entries[i]);
    c -> next_entry_idx ++;
  } else {
    // Current chunk is full: allocate a fresh one
    StgTRecChunk *nc;
487
    nc = new_stg_trec_chunk(cap);
488
489
490
491
492
493
494
495
496
497
498
    nc -> prev_chunk = c;
    nc -> next_entry_idx = 1;
    t -> current_chunk = nc;
    result = &(nc -> entries[0]);
  }

  return result;
}

/*......................................................................*/

499
static void merge_update_into(Capability *cap,
500
                              StgTRecHeader *t,
501
502
                              StgTVar *tvar,
                              StgClosure *expected_value,
503
                              StgClosure *new_value) {
504
505
506
507
508
509
510
511
512
  int found;
  
  // Look for an entry in this trec
  found = FALSE;
  FOR_EACH_ENTRY(t, e, {
    StgTVar *s;
    s = e -> tvar;
    if (s == tvar) {
      found = TRUE;
513
514
515
516
517
518
      if (e -> expected_value != expected_value) {
        // Must abort if the two entries start from different values
        TRACE("%p : entries inconsistent at %p (%p vs %p)\n", 
              t, tvar, e -> expected_value, expected_value);
        t -> state = TREC_CONDEMNED;
      } 
519
520
521
522
523
524
525
526
      e -> new_value = new_value;
      BREAK_FOR_EACH;
    }
  });

  if (!found) {
    // No entry so far in this trec
    TRecEntry *ne;
527
    ne = get_new_entry(cap, t);
528
529
530
531
532
533
534
535
    ne -> tvar = tvar;
    ne -> expected_value = expected_value;
    ne -> new_value = new_value;
  }
}

/*......................................................................*/

536
537
538
539
540
static StgBool entry_is_update(TRecEntry *e) {
  StgBool result;
  result = (e -> expected_value != e -> new_value);
  return result;
} 
541

542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
static StgBool entry_is_read_only(TRecEntry *e) {
  StgBool result;
  result = (e -> expected_value == e -> new_value);
  return result;
} 

static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
  StgClosure *c;
  StgBool result;
  c = s -> current_value;
  result = (c == (StgClosure *) h);
  return result;  
}

// revert_ownership : release a lock on a TVar, storing back
// the value that it held when the lock was acquired.  "revert_all"
// is set in stmWait and stmReWait when we acquired locks on all of 
// the TVars involved.  "revert_all" is not set in commit operations
// where we don't lock TVars that have been read from but not updated.

static void revert_ownership(StgTRecHeader *trec STG_UNUSED,
                             StgBool revert_all STG_UNUSED) {
#if defined(STM_FG_LOCKS) 
  FOR_EACH_ENTRY(trec, e, {
    if (revert_all || entry_is_update(e)) {
567
568
      StgTVar *s;
      s = e -> tvar;
569
570
      if (tvar_is_locked(s, trec)) {
        unlock_tvar(trec, s, e -> expected_value, TRUE);
571
      }
572
573
574
    }
  });
#endif
575
}
576

577
578
/*......................................................................*/

579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
// validate_and_acquire_ownership : this performs the twin functions
// of checking that the TVars referred to by entries in trec hold the
// expected values and:
// 
//   - locking the TVar (on updated TVars during commit, or all TVars
//     during wait)
//
//   - recording the identity of the TRec who wrote the value seen in the
//     TVar (on non-updated TVars during commit).  These values are 
//     stashed in the TRec entries and are then checked in check_read_only
//     to ensure that an atomic snapshot of all of these locations has been
//     seen.

static StgBool validate_and_acquire_ownership (StgTRecHeader *trec, 
                                               int acquire_all,
                                               int retain_ownership) {
  StgBool result;
596
597

  if (shake()) {
598
    TRACE("%p : shake, pretending trec is invalid when it may not be\n", trec);
599
600
601
    return FALSE;
  }

602
603
604
605
  ASSERT ((trec -> state == TREC_ACTIVE) || 
	  (trec -> state == TREC_WAITING) ||
	  (trec -> state == TREC_CONDEMNED));
  result = !((trec -> state) == TREC_CONDEMNED);
606
  if (result) {
607
    FOR_EACH_ENTRY(trec, e, {
608
609
      StgTVar *s;
      s = e -> tvar;
610
611
612
613
614
615
616
617
      if (acquire_all || entry_is_update(e)) {
        TRACE("%p : trying to acquire %p\n", trec, s);
        if (!cond_lock_tvar(trec, s, e -> expected_value)) {
          TRACE("%p : failed to acquire %p\n", trec, s);
          result = FALSE;
          BREAK_FOR_EACH;
        }
      } else {
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
        ASSERT(use_read_phase);
        IF_STM_FG_LOCKS({
          TRACE("%p : will need to check %p\n", trec, s);
          if (s -> current_value != e -> expected_value) {
            TRACE("%p : doesn't match\n", trec);
            result = FALSE;
            BREAK_FOR_EACH;
          }
          e -> saw_update_by = s -> last_update_by;
          if (s -> current_value != e -> expected_value) {
            TRACE("%p : doesn't match (race)\n", trec);
            result = FALSE;
            BREAK_FOR_EACH;
          } else {
            TRACE("%p : need to check update by %p\n", trec, e -> saw_update_by);
          }
        });
635
636
637
      }
    });
  }
638
639
640
641
642

  if ((!result) || (!retain_ownership)) {
    revert_ownership(trec, acquire_all);
  }
  
643
644
645
  return result;
}

646
647
648
649
650
651
652
653
654
655
// check_read_only : check that we've seen an atomic snapshot of the
// non-updated TVars accessed by a trec.  This checks that the last TRec to
// commit an update to the TVar is unchanged since the value was stashed in
// validate_and_acquire_ownership.  If no udpate is seen to any TVar than
// all of them contained their expected values at the start of the call to
// check_read_only.
//
// The paper "Concurrent programming without locks" (under submission), or
// Keir Fraser's PhD dissertation "Practical lock-free programming" discuss
// this kind of algorithm.
656

657
static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
658
659
  StgBool result = TRUE;

660
661
662
663
664
665
666
667
668
669
670
671
672
  ASSERT (use_read_phase);
  IF_STM_FG_LOCKS({
    FOR_EACH_ENTRY(trec, e, {
      StgTVar *s;
      s = e -> tvar;
      if (entry_is_read_only(e)) {
        TRACE("%p : check_read_only for TVar %p, saw %p\n", trec, s, e -> saw_update_by);
        if (s -> last_update_by != e -> saw_update_by) {
          // ||s -> current_value != e -> expected_value) {
          TRACE("%p : mismatch\n", trec);
          result = FALSE;
          BREAK_FOR_EACH;
        }
673
      }
674
    });
675
676
677
678
  });

  return result;
}
679
680
681
682
683


/************************************************************************/

void stmPreGCHook() {
684
  lock_stm(NO_TREC);
685
  TRACE("stmPreGCHook\n");
686
  unlock_stm(NO_TREC);
687
688
689
690
691
692
693
694
695
696
}

/************************************************************************/

void initSTM() {
  TRACE("initSTM, NO_TREC=%p\n", NO_TREC);
}

/*......................................................................*/

697
StgTRecHeader *stmStartTransaction(Capability *cap,
698
                                   StgTRecHeader *outer) {
699
  StgTRecHeader *t;
700
  TRACE("%p : stmStartTransaction\n", outer);
701
  t = new_stg_trec_header(cap, outer);
702
  TRACE("%p : stmStartTransaction()=%p\n", outer, t);
703
704
705
706
707
708
  return t;
}

/*......................................................................*/

void stmAbortTransaction(StgTRecHeader *trec) {
709
  TRACE("%p : stmAbortTransaction\n", trec);
710
711
712
  ASSERT (trec != NO_TREC);
  ASSERT ((trec -> state == TREC_ACTIVE) || 
          (trec -> state == TREC_WAITING) ||
713
714
715
          (trec -> state == TREC_CONDEMNED));

  lock_stm(trec);
716
717
  if (trec -> state == TREC_WAITING) {
    ASSERT (trec -> enclosing_trec == NO_TREC);
718
719
    TRACE("%p : stmAbortTransaction aborting waiting transaction\n", trec);
    remove_wait_queue_entries_for_trec(trec);
720
721
  } 
  trec -> state = TREC_ABORTED;
722
  unlock_stm(trec);
723

724
  TRACE("%p : stmAbortTransaction done\n", trec);
725
726
727
728
}

/*......................................................................*/

729
void stmCondemnTransaction(StgTRecHeader *trec) {
730
  TRACE("%p : stmCondemnTransaction\n", trec);
731
732
733
  ASSERT (trec != NO_TREC);
  ASSERT ((trec -> state == TREC_ACTIVE) || 
          (trec -> state == TREC_WAITING) ||
734
          (trec -> state == TREC_CONDEMNED));
735

736
  lock_stm(trec);
737
738
  if (trec -> state == TREC_WAITING) {
    ASSERT (trec -> enclosing_trec == NO_TREC);
739
740
    TRACE("%p : stmCondemnTransaction condemning waiting transaction\n", trec);
    remove_wait_queue_entries_for_trec(trec);
741
  } 
742
743
  trec -> state = TREC_CONDEMNED;
  unlock_stm(trec);
744

745
  TRACE("%p : stmCondemnTransaction done\n", trec);
746
747
748
749
}

/*......................................................................*/

750
751
StgTRecHeader *stmGetEnclosingTRec(StgTRecHeader *trec) {
  StgTRecHeader *outer;
752
  TRACE("%p : stmGetEnclosingTRec\n", trec);
753
  outer = trec -> enclosing_trec;
754
  TRACE("%p : stmGetEnclosingTRec()=%p\n", trec, outer);
755
756
757
758
759
  return outer;
}

/*......................................................................*/

760
761
762
763
764
StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) {
  StgTRecHeader *t;
  StgBool result;

  TRACE("%p : stmValidateNestOfTransactions\n", trec);
765
766
  ASSERT(trec != NO_TREC);
  ASSERT((trec -> state == TREC_ACTIVE) || 
767
768
769
770
         (trec -> state == TREC_WAITING) ||
         (trec -> state == TREC_CONDEMNED));

  lock_stm(trec);
771

772
773
774
775
776
777
  t = trec;
  result = TRUE;
  while (t != NO_TREC) {
    result &= validate_and_acquire_ownership(t, TRUE, FALSE);
    t = t -> enclosing_trec;
  }
778
779

  if (!result && trec -> state != TREC_WAITING) {
780
    trec -> state = TREC_CONDEMNED; 
781
782
  }

783
  unlock_stm(trec);
784

785
  TRACE("%p : stmValidateNestOfTransactions()=%d\n", trec, result);
786
787
788
789
790
  return result;
}

/*......................................................................*/

791
StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
792
  int result;
793

794
  TRACE("%p : stmCommitTransaction()\n", trec);
795
  ASSERT (trec != NO_TREC);
796
  ASSERT (trec -> enclosing_trec == NO_TREC);
797
  ASSERT ((trec -> state == TREC_ACTIVE) || 
798
          (trec -> state == TREC_CONDEMNED));
799

800
801
  lock_stm(trec);
  result = validate_and_acquire_ownership(trec, (!use_read_phase), TRUE);
802
  if (result) {
803
804
805
806
807
808
    // We now know that all the updated locations hold their expected values.
    ASSERT (trec -> state == TREC_ACTIVE);

    if (use_read_phase) {
      TRACE("%p : doing read check\n", trec);
      result = check_read_only(trec);
809
      TRACE("%p : read-check %s\n", trec, result ? "succeeded" : "failed");
810
811
812
813
814
815
    }
    
    if (result) {
      // We now know that all of the read-only locations held their exepcted values
      // at the end of the call to validate_and_acquire_ownership.  This forms the
      // linearization point of the commit.
816
817
818
819
      
      FOR_EACH_ENTRY(trec, e, {
        StgTVar *s;
        s = e -> tvar;
820
821
822
823
824
825
        if (e -> new_value != e -> expected_value) {
          // Entry is an update: write the value back to the TVar, unlocking it if
          // necessary.

          ACQ_ASSERT(tvar_is_locked(s, trec));
          TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s);
826
          unpark_waiters_on(cap,s);
827
828
829
          IF_STM_FG_LOCKS({
            s -> last_update_by = trec;
          });
830
831
832
          unlock_tvar(trec, s, e -> new_value, TRUE);
        } 
        ACQ_ASSERT(!tvar_is_locked(s, trec));
833
      });
834
835
    } else {
      revert_ownership(trec, FALSE);
836
837
838
    }
  } 

839
  unlock_stm(trec);
840

841
  TRACE("%p : stmCommitTransaction()=%d\n", trec, result);
842
843
844
845
846
847

  return result;
}

/*......................................................................*/

848
StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) {
849
  StgTRecHeader *et;
850
  int result;
851
852
853
854
855
856
857
858
  ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC);
  TRACE("%p : stmCommitNestedTransaction() into %p\n", trec, trec -> enclosing_trec);
  ASSERT ((trec -> state == TREC_ACTIVE) || (trec -> state == TREC_CONDEMNED));

  lock_stm(trec);

  et = trec -> enclosing_trec;
  result = validate_and_acquire_ownership(trec, FALSE, TRUE);
859
  if (result) {
860
861
862
863
864
865
    // We now know that all the updated locations hold their expected values.

    if (use_read_phase) {
      TRACE("%p : doing read check\n", trec);
      result = check_read_only(trec);
    }
866
    if (result) {
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
      // We now know that all of the read-only locations held their exepcted values
      // at the end of the call to validate_and_acquire_ownership.  This forms the
      // linearization point of the commit.

      if (result) {
        TRACE("%p : read-check succeeded\n", trec);
        FOR_EACH_ENTRY(trec, e, {
          // Merge each entry into the enclosing transaction record, release all
          // locks.

          StgTVar *s;
          s = e -> tvar;
          if (entry_is_update(e)) {
            unlock_tvar(trec, s, e -> expected_value, FALSE);
          }
882
          merge_update_into(cap, et, s, e -> expected_value, e -> new_value);
883
884
885
886
887
888
889
          ACQ_ASSERT(s -> current_value != trec);
        });
      } else {
        revert_ownership(trec, FALSE);
      }
    }
  } 
890

891
  unlock_stm(trec);
892

893
  TRACE("%p : stmCommitNestedTransaction()=%d\n", trec, result);
894
895
896
897
898
899

  return result;
}

/*......................................................................*/

900
StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) {
901
  int result;
902
  TRACE("%p : stmWait(%p)\n", trec, tso);
903
904
905
  ASSERT (trec != NO_TREC);
  ASSERT (trec -> enclosing_trec == NO_TREC);
  ASSERT ((trec -> state == TREC_ACTIVE) || 
906
          (trec -> state == TREC_CONDEMNED));
907

908
909
  lock_stm(trec);
  result = validate_and_acquire_ownership(trec, TRUE, TRUE);
910
911
912
913
  if (result) {
    // The transaction is valid so far so we can actually start waiting.
    // (Otherwise the transaction was not valid and the thread will have to
    // retry it).
914
915
916
917

    // Put ourselves to sleep.  We retain locks on all the TVars involved
    // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM
    // in the TSO, (c) TREC_WAITING in the Trec.  
918
    build_wait_queue_entries_for_trec(cap, tso, trec);
919
920
921
    park_tso(tso);
    trec -> state = TREC_WAITING;

922
923
924
925
926
927
928
929
930
    // As soon as we start releasing ownership, another thread may find us 
    // and wake us up.  This may happen even before we have finished 
    // releasing ownership.
    revert_ownership(trec, TRUE);
  }  

  unlock_stm(trec);

  TRACE("%p : stmWait(%p)=%d\n", trec, tso, result);
931
932
933
934
935
  return result;
}

/*......................................................................*/

936
StgBool stmReWait(StgTSO *tso) {
937
  int result;
938
939
  StgTRecHeader *trec = tso->trec;

940
  TRACE("%p : stmReWait\n", trec);
941
942
  ASSERT (trec != NO_TREC);
  ASSERT (trec -> enclosing_trec == NO_TREC);
943
  ASSERT ((trec -> state == TREC_WAITING) || 
944
          (trec -> state == TREC_CONDEMNED));
945

946
947
948
  lock_stm(trec);
  result = validate_and_acquire_ownership(trec, TRUE, TRUE);
  TRACE("%p : validation %s\n", trec, result ? "succeeded" : "failed");
949
950
951
952
  if (result) {
    // The transaction remains valid -- do nothing because it is already on
    // the wait queues
    ASSERT (trec -> state == TREC_WAITING);
953
    park_tso(tso);
954
    revert_ownership(trec, TRUE);
955
956
957
  } else {
    // The transcation has become invalid.  We can now remove it from the wait
    // queues.
958
959
    if (trec -> state != TREC_CONDEMNED) {
      remove_wait_queue_entries_for_trec (trec);
960
    }
961
962

  }
963
  unlock_stm(trec);
964

965
  TRACE("%p : stmReWait()=%d\n", trec, result);
966
967
968
969
970
  return result;
}

/*......................................................................*/

971
972
static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeader **in) {
  TRecEntry *result = NULL;
973

974
975
  TRACE("%p : get_entry_for TVar %p\n", trec, tvar);
  ASSERT(trec != NO_TREC);
976

977
978
  do {
    FOR_EACH_ENTRY(trec, e, {
979
      if (e -> tvar == tvar) {
980
981
982
983
        result = e;
        if (in != NULL) {
          *in = trec;
        }
984
985
986
        BREAK_FOR_EACH;
      }
    });
987
988
    trec = trec -> enclosing_trec;
  } while (result == NULL && trec != NO_TREC);
989

990
991
992
993
994
995
996
997
998
999
  return result;    
}

static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
  StgClosure *result;
  result = tvar -> current_value;

#if defined(STM_FG_LOCKS)
  while (GET_INFO(result) == &stg_TREC_HEADER_info) {
    TRACE("%p : read_current_value(%p) saw %p\n", trec, tvar, result);
1000
1001
    result = tvar -> current_value;
  }
1002
#endif
1003

1004
1005
1006
  TRACE("%p : read_current_value(%p)=%p\n", trec, tvar, result);
  return result;
}
1007

1008
1009
/*......................................................................*/

1010
StgClosure *stmReadTVar(Capability *cap,
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
                        StgTRecHeader *trec, 
			StgTVar *tvar) {
  StgTRecHeader *entry_in;
  StgClosure *result = NULL;
  TRecEntry *entry = NULL;
  TRACE("%p : stmReadTVar(%p)\n", trec, tvar);
  ASSERT (trec != NO_TREC);
  ASSERT (trec -> state == TREC_ACTIVE || 
          trec -> state == TREC_CONDEMNED);

  entry = get_entry_for(trec, tvar, &entry_in);

  if (entry != NULL) {
    if (entry_in == trec) {
      // Entry found in our trec
      result = entry -> new_value;
    } else {
      // Entry found in another trec
1029
      TRecEntry *new_entry = get_new_entry(cap, trec);
1030
1031
1032
1033
1034
1035
1036
1037
      new_entry -> tvar = tvar;
      new_entry -> expected_value = entry -> expected_value;
      new_entry -> new_value = entry -> new_value;
      result = new_entry -> new_value;
    } 
  } else {
    // No entry found
    StgClosure *current_value = read_current_value(trec, tvar);
1038
    TRecEntry *new_entry = get_new_entry(cap, trec);
1039
1040
1041
1042
1043
1044
1045
    new_entry -> tvar = tvar;
    new_entry -> expected_value = current_value;
    new_entry -> new_value = current_value;
    result = current_value;
  }

  TRACE("%p : stmReadTVar(%p)=%p\n", trec, tvar, result);
1046
1047
1048
1049
1050
  return result;
}

/*......................................................................*/

1051
void stmWriteTVar(Capability *cap,
1052
                  StgTRecHeader *trec,
1053
1054
		  StgTVar *tvar, 
		  StgClosure *new_value) {
1055
1056

  StgTRecHeader *entry_in;
1057
  TRecEntry *entry = NULL;
1058
  TRACE("%p : stmWriteTVar(%p, %p)\n", trec, tvar, new_value);
1059
1060
  ASSERT (trec != NO_TREC);
  ASSERT (trec -> state == TREC_ACTIVE || 
1061
          trec -> state == TREC_CONDEMNED);
1062

1063
  entry = get_entry_for(trec, tvar, &entry_in);
1064

1065
1066
1067
1068
1069
1070
  if (entry != NULL) {
    if (entry_in == trec) {
      // Entry found in our trec
      entry -> new_value = new_value;
    } else {
      // Entry found in another trec
1071
      TRecEntry *new_entry = get_new_entry(cap, trec);
1072
1073
1074
1075
      new_entry -> tvar = tvar;
      new_entry -> expected_value = entry -> expected_value;
      new_entry -> new_value = new_value;
    } 
1076
1077
  } else {
    // No entry found
1078
    StgClosure *current_value = read_current_value(trec, tvar);
1079
    TRecEntry *new_entry = get_new_entry(cap, trec);
1080
1081
1082
    new_entry -> tvar = tvar;
    new_entry -> expected_value = current_value;
    new_entry -> new_value = new_value;
1083
1084
  }

1085
  TRACE("%p : stmWriteTVar done\n", trec);
1086
1087
1088
1089
}

/*......................................................................*/

1090
StgTVar *stmNewTVar(Capability *cap,
1091
1092
                    StgClosure *new_value) {
  StgTVar *result;
1093
  result = new_tvar(cap, new_value);
1094
1095
1096
1097
1098
  return result;
}

/*......................................................................*/