Commit 3b9c5eb2 authored by sof's avatar sof
Browse files

[project @ 2002-01-31 11:18:06 by sof]

First steps towards implementing better interop between
Concurrent Haskell and native threads.

- factored out Capability handling into a separate source file
  (only the SMP build uses multiple capabilities tho).
- factored out OS/native threads handling into a separate
  source file, OSThreads.{c,h}. Currently, just a pthreads-based
  implementation; Win32 version to follow.
- scheduler code now distinguishes between multi-task threaded
  code (SMP) and single-task threaded code ('threaded RTS'),
  but sharing code between these two modes whenever poss.

i.e., just a first snapshot; the bulk of the transitioning code
remains to be implemented.
parent e282fcb3
/* ---------------------------------------------------------------------------
*
* (c) The GHC Team, 2001
*
* Capabilities
*
* The notion of a capability is used when operating in multi-threaded
* environments (which the SMP and Threads builds of the RTS do), to
* hold all the state an OS thread/task needs to run Haskell code:
* its STG registers, a pointer to its TSO, a nursery etc. During
* STG execution, a pointer to the capabilitity is kept in a
* register (BaseReg).
*
* Only in an SMP build will there be multiple capabilities, the threaded
* RTS and other non-threaded builds, there is one global capability,
* namely MainRegTable.
*
*
* --------------------------------------------------------------------------*/
#include "PosixSource.h"
#include "Rts.h"
#include "RtsUtils.h"
#include "Capability.h"
void
initCapability( Capability *cap )
{
cap->f.stgChk0 = (F_)__stg_chk_0;
cap->f.stgChk1 = (F_)__stg_chk_1;
cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
cap->f.stgUpdatePAP = (F_)__stg_update_PAP;
}
/* Free capability list.
* Locks required: sched_mutex.
*/
#if defined(SMP)
static Capability *free_capabilities; /* Available capabilities for running threads */
void grabCapability(Capability** cap)
{
*cap = free_capabilities;
free_capabilities = (*cap)->link;
rts_n_free_capabilities--;
}
void releaseCapability(Capability** cap)
{
(*cap)->link = free_capabilities;
free_capabilities = *cap;
rts_n_free_capabilities++;
return;
}
/* Allocate 'n' capabilities */
void
initCapabilities(nat n)
{
nat i;
Capability *cap, *prev;
cap = NULL;
prev = NULL;
for (i = 0; i < n; i++) {
cap = stgMallocBytes(sizeof(Capability), "initCapabilities");
initCapability(cap);
cap->link = prev;
prev = cap;
}
free_capabilities = cap;
rts_n_free_capabilities = n;
IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n", n_free_capabilities););
}
#endif /* SMP */
/* ---------------------------------------------------------------------------
*
* (c) The GHC Team, 2001
*
* Capabilities
*
* The notion of a capability is used when operating in multi-threaded
* environments (which the SMP and Threads builds of the RTS do), to
* hold all the state an OS thread/task needs to run Haskell code:
* its STG registers, a pointer to its TSO, a nursery etc. During
* STG execution, a pointer to the capabilitity is kept in a
* register (BaseReg).
*
* Only in an SMP build will there be multiple capabilities, the threaded
* RTS and other non-threaded builds, there is one global capability,
* namely MainRegTable.
*
* This header file contains the functions for working with capabilities.
* (the main, and only, consumer of this interface is the scheduler).
*
* --------------------------------------------------------------------------*/
#ifndef __CAPABILITY_H__
#define __CAPABILITY_H__
#include "RtsFlags.h"
/* ToDo: assume that RtsFlags.h has been included at usage sites of Capability.h? */
extern void initCapability(Capability* cap);
#if defined(SMP)
extern nat rts_n_free_capabilities; /* total number of available capabilities */
static inline nat getFreeCapabilities()
{
return rts_n_free_capabilities;
}
static inline rtsBool noFreeCapabilities()
{
return (rts_n_free_capabilities == 0);
}
static inline rtsBool allFreeCapabilities()
{
return (rts_n_free_capabilities == RtsFlags.ParFlags.nNodes);
}
extern void initCapabilities(nat n);
extern void grabCapability(Capability** cap);
extern void releaseCapability(Capability** cap);
#endif /* SMP */
#endif /* __CAPABILITY_H__ */
#-----------------------------------------------------------------------------
# $Id: Makefile,v 1.59 2002/01/30 16:27:34 simonmar Exp $
# $Id: Makefile,v 1.60 2002/01/31 11:18:06 sof Exp $
#
# This is the Makefile for the runtime-system stuff.
# This stuff is written in C (and cannot be written in Haskell).
......@@ -97,6 +97,14 @@ ifeq "$(BootingFromHc)" "YES"
SRC_CC_OPTS += $(HC_OPTS)
endif
# Currently, you only get 'threads support' in the normal
# way.
ifeq "$(GhcRtsThreaded)" "YES"
ifeq "$(way)" ""
SRC_CC_OPTS += -DTHREADED_RTS
endif
endif
#-----------------------------------------------------------------------------
# Include the Front panel code?
......
/* ---------------------------------------------------------------------------
*
* (c) The GHC Team, 2001
*
* Accessing OS threads functionality in a (mostly) OS-independent
* manner.
*
*
* --------------------------------------------------------------------------*/
#include "Rts.h"
#if defined(RTS_SUPPORTS_THREADS)
#include "OSThreads.h"
#if defined(HAVE_PTHREAD_H) && !defined(WANT_NATIVE_WIN32_THREADS)
/*
* This (allegedly) OS threads independent layer was initially
* abstracted away from code that used Pthreads, so the functions
* provided here are mostly just wrappers to the Pthreads API.
*
*/
void initCondVar( CondVar* pCond )
{
pthread_cond_init(pCond, NULL);
return;
}
void closeCondVar( CondVar* pCond )
{
pthread_cond_destroy(pCond);
return;
}
rtsBool
broadcastCondVar ( CondVar* pCond )
{
return (pthread_cond_broadcast(pCond) == 0);
}
rtsBool
signalCondVar ( CondVar* pCond )
{
return (pthread_cond_signal(pCond) == 0);
}
rtsBool
waitCondVar ( CondVar* pCond, MutexVar* pMut )
{
return (pthread_cond_wait(pCond,pMut) == 0);
}
void shutdownThread()
{
pthread_exit(NULL);
}
int createOSThread ( OSThreadId* pId, void *(*startProc)(void*))
{
return pthread_create(pId, NULL, startProc, NULL);
}
OSThreadId osThreadId()
{
return pthread_self();
}
void initMutexVar (MutexVar* pMut)
{
pthread_mutex_init(pMut,NULL);
return;
}
#elif defined(HAVE_WINDOWS_H)
/* Win32 threads and synchronisation objects */
/* A CondVar is represented by a Win32 Event object,
* a MutexVar by a Mutex kernel object.
*/
void initCondVar( CondVar* pCond )
{
HANDLE h = CreateEvent(NULL,
TRUE, /* manual reset */
TRUE, /* initially signalled */
NULL); /* unnamed => process-local. */
pthread_cond_init(pCond, NULL);
return;
}
void closeCondVar( CondVar* pCond )
{
pthread_cond_destroy(pCond);
return;
}
rtsBool
broadcastCondVar ( CondVar* pCond )
{
return (pthread_cond_broadcast(pCond) == 0);
}
rtsBool
signalCondVar ( CondVar* pCond )
{
return (pthread_cond_signal(pCond) == 0);
}
rtsBool
waitCondVar ( CondVar* pCond, MutexVar* pMut )
{
return (pthread_cond_wait(pCond,pMut) == 0);
}
void shutdownThread()
{
pthread_exit(NULL);
}
int createOSThread ( OSThreadId* pId, void *(*startProc)(void*))
{
return pthread_create(pId, NULL, startProc, NULL);
}
OSThreadId osThreadId()
{
return pthread_self();
}
void initMutexVar (MutexVar* pMut)
{
pthread_mutex_init(pMut);
return;
}
#endif /* defined(HAVE_PTHREAD_H) */
#endif /* defined(RTS_SUPPORTS_THREADS) */
/* ---------------------------------------------------------------------------
*
* (c) The GHC Team, 2001
*
* Accessing OS threads functionality in a (mostly) OS-independent
* manner.
*
*
* --------------------------------------------------------------------------*/
#ifndef __OSTHREADS_H__
#define __OSTHREADS_H__
#if defined(RTS_SUPPORTS_THREADS) /*to the end */
#if defined(HAVE_PTHREAD_H) && !defined(WANT_NATIVE_WIN32_THREADS)
#include <pthread.h>
typedef pthread_cond_t CondVar;
typedef pthread_mutex_t MutexVar;
typedef pthread_t OSThreadId;
#define INIT_MUTEX_VAR PTHREAD_MUTEX_INITIALIZER
#define INIT_COND_VAR PTHREAD_COND_INITIALIZER
#elif defined(HAVE_WINDOWS_H)
#include <windows.h>
typedef HANDLE CondVar;
typedef HANDLE MutexVar;
typedef HANDLE OSThreadId;
#define INIT_MUTEX_VAR 0
#define INIT_COND_VAR 0
#else
#error "Threads not supported"
#endif
extern void initCondVar ( CondVar* pCond );
extern void closeCondVar ( CondVar* pCond );
extern rtsBool broadcastCondVar (CondVar* pCond );
extern rtsBool signalCondVar ( CondVar* pCond );
extern rtsBool waitCondVar ( CondVar* pCond, MutexVar* pMut);
extern OSThreadId osThreadId(void);
extern void shutdownThread(void);
extern int createOSThread ( OSThreadId* tid, void *(*startProc)(void*));
extern void initMutexVar ( MutexVar* pMut );
#endif /* defined(RTS_SUPPORTS_THREADS) */
#endif /* __OSTHREADS_H__ */
/* ---------------------------------------------------------------------------
* $Id: Schedule.c,v 1.113 2002/01/24 07:50:02 sof Exp $
* $Id: Schedule.c,v 1.114 2002/01/31 11:18:07 sof Exp $
*
* (c) The GHC Team, 1998-2000
*
......@@ -10,10 +10,11 @@
*
* WAY Name CPP flag What's it for
* --------------------------------------
* mp GUM PAR Parallel execution on a distributed memory machine
* s SMP SMP Parallel execution on a shared memory machine
* mg GranSim GRAN Simulation of parallel execution
* md GUM/GdH DIST Distributed execution (based on GUM)
* mp GUM PAR Parallel execution on a distributed memory machine
* s SMP SMP Parallel execution on a shared memory machine
* mg GranSim GRAN Simulation of parallel execution
* md GUM/GdH DIST Distributed execution (based on GUM)
*
* --------------------------------------------------------------------------*/
//@node Main scheduling code, , ,
......@@ -109,6 +110,8 @@
# include "HLC.h"
#endif
#include "Sparks.h"
#include "Capability.h"
#include "OSThreads.h"
#include <stdarg.h>
......@@ -119,7 +122,7 @@
*
* These are the threads which clients have requested that we run.
*
* In an SMP build, we might have several concurrent clients all
* In a 'threaded' build, we might have several concurrent clients all
* waiting for results, and each one will wait on a condition variable
* until the result is available.
*
......@@ -134,8 +137,8 @@ typedef struct StgMainThread_ {
StgTSO * tso;
SchedulerStatus stat;
StgClosure ** ret;
#ifdef SMP
pthread_cond_t wakeup;
#if defined(RTS_SUPPORTS_THREADS)
CondVar wakeup;
#endif
struct StgMainThread_ *link;
} StgMainThread;
......@@ -224,13 +227,8 @@ StgThreadID next_thread_id = 1;
#define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
/* Free capability list.
* Locks required: sched_mutex.
*/
#ifdef SMP
Capability *free_capabilities; /* Available capabilities for running threads */
nat n_free_capabilities; /* total number of available capabilities */
#else
#if !defined(SMP)
Capability MainCapability; /* for non-SMP, we have one global capability */
#endif
......@@ -248,7 +246,7 @@ rtsBool ready_to_gc;
/* All our current task ids, saved in case we need to kill them later.
*/
#ifdef SMP
#if defined(SMP)
//@cindex task_ids
task_info *task_ids;
#endif
......@@ -269,15 +267,14 @@ static void detectBlackHoles ( void );
static void sched_belch(char *s, ...);
#endif
#ifdef SMP
//@cindex sched_mutex
//@cindex term_mutex
//@cindex thread_ready_cond
//@cindex gc_pending_cond
pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER;
pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER;
#if defined(RTS_SUPPORTS_THREADS)
/* ToDo: carefully document the invariants that go together
* with these synchronisation objects.
*/
MutexVar sched_mutex = INIT_MUTEX_VAR;
MutexVar term_mutex = INIT_MUTEX_VAR;
CondVar thread_ready_cond = INIT_COND_VAR;
CondVar gc_pending_cond = INIT_COND_VAR;
nat await_death;
#endif
......@@ -430,7 +427,7 @@ schedule( void )
* should be done more efficiently without a linear scan
* of the main threads list, somehow...
*/
#ifdef SMP
#if defined(RTS_SUPPORTS_THREADS)
{
StgMainThread *m, **prev;
prev = &main_threads;
......@@ -442,7 +439,7 @@ schedule( void )
}
*prev = m->link;
m->stat = Success;
pthread_cond_broadcast(&m->wakeup);
broadcastCondVar(&m->wakeup);
break;
case ThreadKilled:
if (m->ret) *(m->ret) = NULL;
......@@ -452,7 +449,7 @@ schedule( void )
} else {
m->stat = Killed;
}
pthread_cond_broadcast(&m->wakeup);
broadcastCondVar(&m->wakeup);
break;
default:
break;
......@@ -460,7 +457,7 @@ schedule( void )
}
}
#else // not SMP
#else /* not threaded */
# if defined(PAR)
/* in GUM do this only on the Main PE */
......@@ -500,7 +497,7 @@ schedule( void )
*/
#if 0 /* defined(SMP) */
{
nat n = n_free_capabilities;
nat n = getFreeCapabilities();
StgTSO *tso = run_queue_hd;
/* Count the run queue */
......@@ -527,8 +524,8 @@ schedule( void )
/* We need to wake up the other tasks if we just created some
* work for them.
*/
if (n_free_capabilities - n > 1) {
pthread_cond_signal(&thread_ready_cond);
if (getFreeCapabilities() - n > 1) {
signalCondVar ( &thread_ready_cond );
}
}
#endif // SMP
......@@ -549,8 +546,8 @@ schedule( void )
if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
awaitEvent(
(run_queue_hd == END_TSO_QUEUE)
#ifdef SMP
&& (n_free_capabilities == RtsFlags.ParFlags.nNodes)
#if defined(SMP)
&& allFreeCapabilities()
#endif
);
}
......@@ -572,8 +569,8 @@ schedule( void )
if (blocked_queue_hd == END_TSO_QUEUE
&& run_queue_hd == END_TSO_QUEUE
&& sleeping_queue == END_TSO_QUEUE
#ifdef SMP
&& (n_free_capabilities == RtsFlags.ParFlags.nNodes)
#if defined(SMP)
&& allFreeCapabilities()
#endif
)
{
......@@ -586,13 +583,14 @@ schedule( void )
IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
detectBlackHoles();
// No black holes, so probably a real deadlock. Send the
// current main thread the Deadlock exception (or in the SMP
// build, send *all* main threads the deadlock exception,
// since none of them can make progress).
/* No black holes, so probably a real deadlock. Send the
* current main thread the Deadlock exception (or in the SMP
* build, send *all* main threads the deadlock exception,
* since none of them can make progress).
*/
if (run_queue_hd == END_TSO_QUEUE) {
StgMainThread *m;
#ifdef SMP
#if defined(RTS_SUPPORTS_THREADS)
for (m = main_threads; m != NULL; m = m->link) {
switch (m->tso->why_blocked) {
case BlockedOnBlackHole:
......@@ -621,28 +619,36 @@ schedule( void )
}
#endif
}
#if !defined(RTS_SUPPORTS_THREADS)
ASSERT( run_queue_hd != END_TSO_QUEUE );
#endif
}
}
#elif defined(PAR)
/* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
#endif
#ifdef SMP
#if defined(SMP)
/* If there's a GC pending, don't do anything until it has
* completed.
*/
if (ready_to_gc) {
IF_DEBUG(scheduler,sched_belch("waiting for GC"));
pthread_cond_wait(&gc_pending_cond, &sched_mutex);
waitCondVar ( &gc_pending_cond, &sched_mutex );
}
#endif
#if defined(RTS_SUPPORTS_THREADS)
/* block until we've got a thread on the run queue and a free
* capability.
*/
while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
while ( run_queue_hd == END_TSO_QUEUE
#if defined(SMP)
|| noFreeCapabilities()
#endif
) {
IF_DEBUG(scheduler, sched_belch("waiting for work"));
pthread_cond_wait(&thread_ready_cond, &sched_mutex);
waitCondVar ( &thread_ready_cond, &sched_mutex );
IF_DEBUG(scheduler, sched_belch("work now available"));
}
#endif
......@@ -933,12 +939,8 @@ schedule( void )
#endif
/* grab a capability
*/
#ifdef SMP
cap = free_capabilities;
free_capabilities = cap->link;
n_free_capabilities--;
grabCapability(&cap);
#else
cap = &MainCapability;
#endif
......@@ -1002,7 +1004,7 @@ schedule( void )
ACQUIRE_LOCK(&sched_mutex);
#ifdef SMP
IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
#elif !defined(GRAN) && !defined(PAR)
IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
#endif
......@@ -1292,9 +1294,7 @@ schedule( void )
}
#ifdef SMP
cap->link = free_capabilities;
free_capabilities = cap;
n_free_capabilities++;
grabCapability(&cap);
#endif
#ifdef PROFILING
......@@ -1307,7 +1307,7 @@ schedule( void )
#endif
#ifdef SMP
if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes)
if (ready_to_gc && allFreeCapabilities() )
#else
if (ready_to_gc)
#endif
......@@ -1317,13 +1317,13 @@ schedule( void )
* to do it in another thread. Either way, we need to
* broadcast on gc_pending_cond afterward.
*/
#ifdef SMP
#if defined(RTS_SUPPORTS_THREADS)
IF_DEBUG(scheduler,sched_belch("doing GC"));
#endif
GarbageCollect(GetRoots,rtsFalse);
ready_to_gc = rtsFalse;
#ifdef SMP
pthread_cond_broadcast(&gc_pending_cond);
broadcastCondVar(&gc_pending_cond);
#endif
#if defined(GRAN)
/* add a ContinueThread event to continue execution of current thread */
......@@ -1421,9 +1421,8 @@ suspendThread( StgRegTable *reg )
tok = cap->r.rCurrentTSO->id;
#ifdef SMP
cap->link = free_capabilities;
free_capabilities = cap;
n_free_capabilities++;
/* Hand back capability */
releaseCapability(&cap);
#endif
RELEASE_LOCK(&sched_mutex);
......@@ -1453,14 +1452,12 @@ resumeThread( StgInt tok )
tso->link = END_TSO_QUEUE;
#ifdef SMP
while (free_capabilities == NULL) {