Improvements to coroutining support. These changes allow us to do

Estimated hours taken: 20

Improvements to coroutining support. These changes allow us to do
provide io primatives that cause the Mercury context to suspend rather
than causing the engine to block.

configure.in:
	Test to see if we can handle contexts that block on IO
	using select().

compiler/pragma_c_gen.m:
	Include the predicate name in the calls to MR_OBTAIN_GLOBAL_C_LOCK
	and MR_RELEASE_GLOBAL_C_LOCK for improved debugging.

	Fix a bug where the global lock was not being released when
	semidet pragma c code failed.

runtime/mercury_thread.h:
	Change the global lock macros to include the message generated
	by the changes to pragma_c_gen.

library/char.m:
library/std_util.m:
	include `thread_safe' in the flags for a couple of pragma c
	definitions that seem to have missed out.

runtime/mercury_context.{c,h}:
	Add a list of "pending" contexts that are blocked on a
	file descriptor. When the runqueue becomes empty, we call
	select on all the pending contexts.

	Move schedule from the header file to the c file (changing
	it from a macro to a function) for easier debugging at a
	slight performance cost.

	TODO: add a nonblocking call to select so that we can poll
	for io from time to time rather than waiting till there is
	nothing else to do.

runtime/mercury_reg_workarounds.{c,h}:
	Make functions that forward to the FD_* macros, which on Linux
	attempt to use registers that we've already grabbed. Aarrggh!

runtime/mercury_thread.c:
	Tidy up some of the conditional compilation.

runtime/mercury_types.h:
	Remove the definition of SpinLock since we're not using them
	and are not likely to any time soon.
This commit is contained in:
Thomas Conway
1998-12-15 00:22:29 +00:00
parent 44440da422
commit b2b99e5b50
10 changed files with 195 additions and 33 deletions

View File

@@ -382,14 +382,19 @@ pragma_c_gen__ordinary_pragma_c_code(CodeModel, Attributes,
%
% Code fragments to obtain and release the global lock
%
code_info__get_module_info(ModuleInfo),
{ ThreadSafe = thread_safe ->
ObtainLock = pragma_c_raw_code(""),
ReleaseLock = pragma_c_raw_code("")
;
ObtainLock =
pragma_c_raw_code("\tMR_OBTAIN_GLOBAL_C_LOCK();\n"),
ReleaseLock =
pragma_c_raw_code("\tMR_RELEASE_GLOBAL_C_LOCK();\n")
module_info_pred_info(ModuleInfo, PredId, PredInfo),
pred_info_name(PredInfo, Name),
string__append_list(["\tMR_OBTAIN_GLOBAL_C_LOCK(""",
Name, """);\n"], ObtainLockStr),
ObtainLock = pragma_c_raw_code(ObtainLockStr),
string__append_list(["\tMR_RELEASE_GLOBAL_C_LOCK(""",
Name, """);\n"], ReleaseLockStr),
ReleaseLock = pragma_c_raw_code(ReleaseLockStr)
},
%
@@ -447,7 +452,7 @@ pragma_c_gen__ordinary_pragma_c_code(CodeModel, Attributes,
% join all the components of the pragma_c together
%
{ Components = [InputComp, SaveRegsComp, ObtainLock, C_Code_Comp,
CheckR1_Comp, ReleaseLock, RestoreRegsComp,
ReleaseLock, CheckR1_Comp, RestoreRegsComp,
OutputComp] },
{ PragmaCCode = node([
pragma_c(Decls, Components, MayCallMercury, MaybeFailLabel, no)

View File

@@ -1147,6 +1147,32 @@ if test "$mercury_cv_has_tempnam" = yes; then
fi
AC_SUBST(IO_HAVE_TEMPNAM)
#-----------------------------------------------------------------------------#
AC_MSG_CHECKING(to see if we can handle contexts blocking on IO)
AC_CACHE_VAL(mercury_cv_can_do_pending_io,
AC_TRY_RUN([
#include <sys/types.h>
#include <sys/time.h>
int main() {
fd_set f;
struct timeval zero;
int err;
FD_ZERO(&f);
zero.tv_sec = 0;
zero.tv_usec = 0;
err = select(1, &f, &f, &f, &zero);
exit(err != 0);
}],
[mercury_cv_can_do_pending_io=yes],
[mercury_cv_can_do_pending_io=no],
[mercury_cv_can_do_pending_io=no])
)
AC_MSG_RESULT($mercury_cv_can_do_pending_io)
if test "$mercury_cv_can_do_pending_io" = yes; then
AC_DEFINE(MR_CAN_DO_PENDING_IO)
fi
AC_SUBST(MR_CAN_DO_PENDING_IO)
#-----------------------------------------------------------------------------#
#
# For Irix 5, gcc labels don't work with shared libraries,
# so if we're using gcc labels, we need to use non-shared libraries,

View File

@@ -401,16 +401,18 @@ char__lower_upper('z', 'Z').
%-----------------------------------------------------------------------------%
:- pragma c_code(char__to_int(Character::in, Int::out), will_not_call_mercury, "
:- pragma c_code(char__to_int(Character::in, Int::out),
[will_not_call_mercury, thread_safe] , "
Int = (UnsignedChar) Character;
").
:- pragma c_code(char__to_int(Character::in, Int::in), will_not_call_mercury, "
:- pragma c_code(char__to_int(Character::in, Int::in),
[will_not_call_mercury, thread_safe] , "
SUCCESS_INDICATOR = ((UnsignedChar) Character == Int);
").
:- pragma c_code(char__to_int(Character::out, Int::in), will_not_call_mercury,
"
:- pragma c_code(char__to_int(Character::out, Int::in),
[will_not_call_mercury, thread_safe] , "
/*
** If the integer doesn't fit into a char, then
** the assignment `Character = Int' below will truncate it.
@@ -427,7 +429,8 @@ char__lower_upper('z', 'Z').
char__min_char_value(0).
:- pragma c_header_code("#include <limits.h>").
:- pragma c_code(char__max_char_value(Max::out), will_not_call_mercury, "
:- pragma c_code(char__max_char_value(Max::out),
[will_not_call_mercury, thread_safe], "
Max = UCHAR_MAX;
").

View File

@@ -959,14 +959,17 @@ unsorted_aggregate(Generator, Accumulator, Acc0, Acc) :-
% to make sure that the compiler doesn't issue any determinism warnings
% for them.
:- pragma c_code(semidet_succeed, will_not_call_mercury,
:- pragma c_code(semidet_succeed, [will_not_call_mercury, thread_safe],
"SUCCESS_INDICATOR = TRUE;").
:- pragma c_code(semidet_fail, will_not_call_mercury,
:- pragma c_code(semidet_fail, [will_not_call_mercury, thread_safe],
"SUCCESS_INDICATOR = FALSE;").
:- pragma c_code(cc_multi_equal(X::in, Y::out), will_not_call_mercury,
:- pragma c_code(cc_multi_equal(X::in, Y::out),
[will_not_call_mercury, thread_safe],
"Y = X;").
:- pragma c_code(cc_multi_equal(X::di, Y::uo), will_not_call_mercury,
:- pragma c_code(cc_multi_equal(X::di, Y::uo),
[will_not_call_mercury, thread_safe],
"Y = X;").
%-----------------------------------------------------------------------------%
% The type `std_util:type_info/0' happens to use much the same

View File

@@ -56,6 +56,7 @@ HDRS = \
mercury_overflow.h \
mercury_prof.h \
mercury_prof_mem.h \
mercury_reg_workarounds.h \
mercury_regorder.h \
mercury_regs.h \
mercury_signal.h \
@@ -113,6 +114,7 @@ CFILES = \
mercury_misc.c \
mercury_prof.c \
mercury_prof_mem.c \
mercury_reg_workarounds.c \
mercury_regs.c \
mercury_signal.c \
mercury_stack_trace.c \

View File

@@ -233,6 +233,12 @@
*/
#undef MR_CANNOT_USE_STRUCTURE_ASSIGNMENT
/*
** To handle contexts suspended on IO operations we use the select() system
** call and supporting data structures which while POSIX, are not ANSI.
*/
#undef MR_CAN_DO_PENDING_IO
/*---------------------------------------------------------------------------*/
#include "mercury_conf_param.h"

View File

@@ -17,10 +17,15 @@ ENDINIT
#ifdef MR_THREAD_SAFE
#include "mercury_thread.h"
#endif
#ifdef MR_CAN_DO_PENDING_IO
#include <sys/types.h> /* for fd_set */
#include <sys/time.h> /* for struct timeval */
#endif
#include "mercury_memory_handlers.h"
#include "mercury_context.h"
#include "mercury_engine.h" /* for `MR_memdebug' */
#include "mercury_engine.h" /* for `MR_memdebug' */
#include "mercury_reg_workarounds.h" /* for `MR_fd*' stuff */
MR_Context *MR_runqueue_head;
MR_Context *MR_runqueue_tail;
@@ -29,6 +34,11 @@ MR_Context *MR_runqueue_tail;
MercuryCond *MR_runqueue_cond;
#endif
MR_PendingContext *MR_pending_contexts;
#ifdef MR_THREAD_SAFE
MercuryLock *MR_pending_contexts_lock;
#endif
/*
** free_context_list is a global linked list of unused context
** structures. If the MemoryZone pointers are not NULL,
@@ -57,6 +67,9 @@ init_thread_stuff(void)
pthread_mutex_init(&MR_global_lock, MR_MUTEX_ATTR);
MR_pending_contexts_lock = make(MercuryLock);
pthread_mutex_init(MR_pending_contexts_lock, MR_MUTEX_ATTR);
MR_KEY_CREATE(&MR_engine_base_key, NULL);
#endif
@@ -160,6 +173,101 @@ flounder(void)
fatal_error("computation floundered");
}
/*
** Check to see if any contexts that blocked on IO have become
** runnable. Return the number of contexts that are still blocked.
** The parameter specifies whether or not the call to select should
** block or not.
*/
static int
check_pending_contexts(Bool block)
{
#ifdef MR_CAN_DO_PENDING_IO
int err, max_id, n_ids;
fd_set rd_set, wr_set, ex_set;
struct timeval timeout;
MR_PendingContext *pctxt;
if (MR_pending_contexts == NULL) {
return 0;
}
MR_fd_zero(&rd_set); MR_fd_zero(&wr_set); MR_fd_zero(&ex_set);
max_id = -1;
for (pctxt = MR_pending_contexts ; pctxt ; pctxt = pctxt -> next) {
if (pctxt->waiting_mode & MR_PENDING_READ) {
if (max_id > pctxt->fd) max_id = pctxt->fd;
FD_SET(pctxt->fd, &rd_set);
}
if (pctxt->waiting_mode & MR_PENDING_WRITE) {
if (max_id > pctxt->fd) max_id = pctxt->fd;
FD_SET(pctxt->fd, &wr_set);
}
if (pctxt->waiting_mode & MR_PENDING_EXEC) {
if (max_id > pctxt->fd) max_id = pctxt->fd;
FD_SET(pctxt->fd, &ex_set);
}
}
max_id++;
if (max_id == 0) {
fatal_error("no fd's set!");
}
if (block) {
err = select(max_id, &rd_set, &wr_set, &ex_set, NULL);
} else {
timeout.tv_sec = 0;
timeout.tv_usec = 0;
err = select(max_id, &rd_set, &wr_set, &ex_set, &timeout);
}
if (err < 0) {
fatal_error("select failed!");
}
n_ids = 0;
for (pctxt = MR_pending_contexts; pctxt; pctxt = pctxt -> next) {
n_ids++;
if ( ((pctxt->waiting_mode & MR_PENDING_READ)
&& FD_ISSET(pctxt->fd, &rd_set))
|| ((pctxt->waiting_mode & MR_PENDING_WRITE)
&& FD_ISSET(pctxt->fd, &wr_set))
|| ((pctxt->waiting_mode & MR_PENDING_EXEC)
&& FD_ISSET(pctxt->fd, &ex_set))
)
{
schedule(pctxt->context);
}
}
return n_ids;
#else /* !MR_CAN_DO_PENDING_IO */
fatal_error("select() unavailable!");
#endif
}
void
schedule(MR_Context *ctxt)
{
ctxt->next = NULL;
MR_LOCK(MR_runqueue_lock, "schedule");
if (MR_runqueue_tail) {
MR_runqueue_tail->next = ctxt;
MR_runqueue_tail = ctxt;
} else {
MR_runqueue_head = ctxt;
MR_runqueue_tail = ctxt;
}
MR_SIGNAL(MR_runqueue_cond);
MR_UNLOCK(MR_runqueue_lock, "schedule");
}
Define_extern_entry(do_runnext);
BEGIN_MODULE(scheduler_module)
@@ -179,43 +287,54 @@ Define_entry(do_runnext);
MR_LOCK(MR_runqueue_lock, "do_runnext (i)");
while (1) {
if (MR_exit_now = TRUE) {
if (MR_exit_now == TRUE) {
MR_UNLOCK(MR_runqueue_lock, "do_runnext (ii)");
destroy_thread(MR_engine_base);
}
tmp = MR_runqueue_head;
/* XXX check pending io */
prev = NULL;
while(tmp != NULL) {
if (depth > 0 && tmp->owner_thread == thd
|| tmp->owner_thread == NULL)
if ((depth > 0 && tmp->owner_thread == thd)
|| (tmp->owner_thread == (MercuryThread) NULL)) {
break;
}
prev = tmp;
tmp = tmp->next;
}
if (tmp != NULL)
if (tmp != NULL) {
break;
}
MR_WAIT(MR_runqueue_cond, MR_runqueue_lock);
}
MR_ENGINE(this_context) = tmp;
if (prev != NULL)
if (prev != NULL) {
prev->next = tmp->next;
else
} else {
MR_runqueue_head = tmp->next;
if (MR_runqueue_tail == tmp)
}
if (MR_runqueue_tail == tmp) {
MR_runqueue_tail = prev;
}
MR_UNLOCK(MR_runqueue_lock, "do_runnext (iii)");
load_context(MR_ENGINE(this_context));
GOTO(MR_ENGINE(this_context)->resume);
}
#else /* !MR_THREAD_SAFE */
{
if (MR_runqueue_head == NULL)
if (MR_runqueue_head == NULL && MR_pending_contexts == NULL) {
fatal_error("empty runqueue!");
}
while (MR_runqueue_head == NULL) {
check_pending_contexts(TRUE); /* block */
}
MR_ENGINE(this_context) = MR_runqueue_head;
MR_runqueue_head = MR_runqueue_head->next;
if (MR_runqueue_head == NULL)
if (MR_runqueue_head == NULL) {
MR_runqueue_tail = NULL;
}
load_context(MR_ENGINE(this_context));
GOTO(MR_ENGINE(this_context)->resume);

View File

@@ -10,6 +10,7 @@
** fatal_error(),
** checked_malloc(),
** MR_memcpy
** MR_fd_zero
*/
#ifndef MERCURY_MISC_H

View File

@@ -15,7 +15,7 @@
#if defined(MR_DIGITAL_UNIX_PTHREADS)
#define MR_MUTEX_ATTR pthread_mutexattr_default
#define MR_COND_ATTR pthread_condattr_default
#define MR_THREAD_ATTR pthread_attr_default
#define MR_THREAD_ATTR pthread_attr_default
#else
#define MR_MUTEX_ATTR NULL
#define MR_COND_ATTR NULL
@@ -27,7 +27,7 @@
typedef pthread_mutex_t MercuryLock;
typedef pthread_cond_t MercuryCond;
#if 0
#ifndef MR_DEBUG_THREADS
/*
** The following macros should be used once the
** use of locking in the generated code is considered
@@ -57,11 +57,11 @@
** predicates which are not thread-safe.
** See the comments below.
*/
#define MR_OBTAIN_GLOBAL_C_LOCK() MR_mutex_lock(&MR_global_lock, \
"pragma c code");
#define MR_OBTAIN_GLOBAL_C_LOCK(where) MR_LOCK(&MR_global_lock, \
(where));
#define MR_RELEASE_GLOBAL_C_LOCK() MR_mutex_unlock(&MR_global_lock, \
"pragma c code");
#define MR_RELEASE_GLOBAL_C_LOCK(where) MR_UNLOCK(&MR_global_lock, \
(where));
#if defined(MR_DIGITAL_UNIX_PTHREADS)
#define MR_GETSPECIFIC(key) ({ \

View File

@@ -65,9 +65,6 @@ typedef const Char *ConstString;
/* continuation function type, for --high-level-C option */
typedef void (*Cont) (void);
/* spinlocks -- see mercury_spinlock.h */
typedef Word SpinLock;
/*
** semidet predicates indicate success or failure by leaving nonzero or zero
** respectively in register r1