mirror of
https://github.com/Mercury-Language/mercury.git
synced 2026-04-15 01:13:30 +00:00
Add a future data type for concurrent and parallel programming
library/library.m:
library/thread.future.m:
library/thread.m:
Add new future standard library module.
NEWS:
Announce the new addition.
library/thread.semaphore.m:
Add an impure interface to thread.semaphore.m. Semaphores are used to
implement our other concurrency primitives and an impure interface can
often be useful to implement things such as futures, which don't require
IO state threading. The impure interface predicate names are prefixed
with "impure_".
library/thread.semaphore.m:
NEWS:
Deprecate the impure init/1 function.
library/thread.mvar.m:
Conform to changes in semaphore.m.
benchmarks/progs/mandelbrot/mandelbrot.m:
Add future example to mandelbrot benchmark.
This commit is contained in:
10
NEWS
10
NEWS
@@ -24,12 +24,18 @@ Changes to the Mercury standard library:
|
||||
highly efficient set implementation for fat sets. This module is a
|
||||
contribution from Yes Logic Pty. Ltd.
|
||||
|
||||
* We have added a module that implements barriers for concurrent
|
||||
programming. This module is a contribution from Mission Critical IT.
|
||||
+ We have added two new modules for concurrent programming: thread.barrier
|
||||
and thread.future. The barrier module provides a barrier type which can
|
||||
be used to control progress in concurrent code. The future module
|
||||
provides future and future_io data types which can be used to compute
|
||||
values in parallel using other threads. These modules were contributed by
|
||||
Mission Critical IT.
|
||||
|
||||
* We have added thread.spawn_native/4 to dedicate an OS thread to a Mercury
|
||||
thread. thread.spawn/4 was added as well.
|
||||
|
||||
+ We have deprecated the impure init/1 function in thread.semaphore.
|
||||
|
||||
* In C grades, the math module now provides the fused multiply-add operation
|
||||
on platforms that support it.
|
||||
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
:- import_module require.
|
||||
:- import_module string.
|
||||
:- import_module thread.
|
||||
:- import_module thread.future.
|
||||
:- import_module thread.mvar.
|
||||
|
||||
main(!IO) :-
|
||||
@@ -115,6 +116,7 @@ default_options(parallel, string("no")).
|
||||
---> parallel_conj
|
||||
; parallel_spawn
|
||||
; parallel_spawn_native
|
||||
; parallel_future
|
||||
; sequential.
|
||||
|
||||
:- pred process_options(option_table(option)::in, maybe_error(options)::out)
|
||||
@@ -143,13 +145,16 @@ process_options(Table, MaybeOptions) :-
|
||||
;
|
||||
ParallelStr = "spawn_native",
|
||||
Parallel0 = parallel_spawn_native
|
||||
;
|
||||
ParallelStr = "future",
|
||||
Parallel0 = parallel_future
|
||||
)
|
||||
->
|
||||
MaybeParallel = ok(Parallel0)
|
||||
;
|
||||
MaybeParallel = error(
|
||||
"Parallel must be one of ""no"", ""conj"", ""spawn"" or " ++
|
||||
"""spawn_native""")
|
||||
"Parallel must be one of ""no"", ""conj"", ""spawn"", " ++
|
||||
"""spawn_native"" or ""future""")
|
||||
),
|
||||
|
||||
getopt.lookup_maybe_int_option(Table, dim_x, MaybeX),
|
||||
@@ -198,9 +203,11 @@ usage(!IO) :-
|
||||
"\t\tThe dimensions of the image, specify neither or both\n", !IO),
|
||||
write_string("\t-p <how> --parallel <how>\n", !IO),
|
||||
write_string(
|
||||
"\t\t<how> is one of ""no"", ""conj"", ""spawn"" or\n", !IO),
|
||||
"\t\t<how> is one of ""no"", ""conj"", ""spawn"",\n", !IO),
|
||||
write_string(
|
||||
"\t\t""spawn_native"". These may be grade dependent.\n", !IO),
|
||||
"\t\t""spawn_native"" or ""future"". These may be grade", !IO),
|
||||
write_string(
|
||||
"\t\tdependent.\n", !IO),
|
||||
write_string("\t-d --dependent-conjunctions\n", !IO),
|
||||
write_string(
|
||||
"\t\tUse an accumulator to represent the rows rendered so far\n", !IO).
|
||||
@@ -241,14 +248,20 @@ draw_rows(Options, StartY, StepY, DimY, StartX, StepX, DimX, Rows) :-
|
||||
:- pred draw_rows_dep(parallel::in, list(float)::in, list(float)::in,
|
||||
cord(colour)::out) is det.
|
||||
|
||||
draw_rows_dep(sequential, Xs, Ys, Rows) :-
|
||||
map_foldl(draw_row(Xs), append_row, Ys, empty, Rows).
|
||||
draw_rows_dep(parallel_conj, Xs, Ys, Rows) :-
|
||||
map_foldl_par_conj(draw_row(Xs), append_row, Ys, empty, Rows).
|
||||
draw_rows_dep(parallel_spawn, Xs, Ys, Rows) :-
|
||||
map_foldl_par_spawn(draw_row(Xs), append_row, Ys, empty, Rows).
|
||||
draw_rows_dep(parallel_spawn_native, Xs, Ys, Rows) :-
|
||||
map_foldl_par_spawn_native(draw_row(Xs), append_row, Ys, empty, Rows).
|
||||
draw_rows_dep(Parallel, Xs, Ys, Rows) :-
|
||||
(
|
||||
Parallel = sequential,
|
||||
map_foldl(draw_row(Xs), append_row, Ys, empty, Rows)
|
||||
;
|
||||
Parallel = parallel_conj,
|
||||
map_foldl_par_conj(draw_row(Xs), append_row, Ys, empty, Rows)
|
||||
;
|
||||
( Parallel = parallel_spawn
|
||||
; Parallel = parallel_spawn_native
|
||||
; Parallel = parallel_future
|
||||
),
|
||||
sorry($file, $pred, string(Parallel))
|
||||
).
|
||||
|
||||
:- pred draw_rows_indep(parallel::in, list(float)::in, list(float)::in,
|
||||
cord(colour)::out) is det.
|
||||
@@ -270,6 +283,9 @@ draw_rows_indep(Parallel, Xs, Ys, Rows) :-
|
||||
promise_equivalent_solutions [RowList] (
|
||||
my_map_par_spawn_native(draw_row(Xs), Ys, RowList)
|
||||
)
|
||||
;
|
||||
Parallel = parallel_future,
|
||||
my_map_par_future(draw_row(Xs), Ys, RowList)
|
||||
),
|
||||
foldl(append_row, RowList, empty, Rows).
|
||||
|
||||
@@ -375,22 +391,6 @@ map_foldl_par_conj(M, F, [X | Xs], !Acc) :-
|
||||
map_foldl_par_conj(M, F, Xs, !Acc)
|
||||
).
|
||||
|
||||
:- pred map_foldl_par_spawn(pred(X, Y), pred(Y, A, A), list(X), A, A).
|
||||
:- mode map_foldl_par_spawn(pred(in, out) is det, pred(in, in, out) is det,
|
||||
in, in, out) is erroneous.
|
||||
|
||||
map_foldl_par_spawn(_, _, _, !Acc) :-
|
||||
% XXX: Do the parallel conjunction transformation by hand.
|
||||
sorry($file, $pred, "Unimplemented").
|
||||
|
||||
:- pred map_foldl_par_spawn_native(pred(X, Y), pred(Y, A, A), list(X), A, A).
|
||||
:- mode map_foldl_par_spawn_native(pred(in, out) is det,
|
||||
pred(in, in, out) is det, in, in, out) is erroneous.
|
||||
|
||||
map_foldl_par_spawn_native(_, _, _, !Acc) :-
|
||||
% XXX: Do the parallel conjunction transformation by hand.
|
||||
sorry($file, $pred, "Unimplemented").
|
||||
|
||||
:- pred my_map(pred(X, Y), list(X), list(Y)).
|
||||
:- mode my_map(pred(in, out) is det, in, out) is det.
|
||||
|
||||
@@ -407,6 +407,16 @@ my_map_par_conj(M, [X | Xs], [Y | Ys]) :-
|
||||
M(X, Y) &
|
||||
my_map_par_conj(M, Xs, Ys).
|
||||
|
||||
:- pred my_map_par_future(pred(X, Y), list(X), list(Y)).
|
||||
:- mode my_map_par_future(pred(in, out) is det, in, out) is det.
|
||||
|
||||
my_map_par_future(_, [], []).
|
||||
my_map_par_future(M, [X | Xs], Ys) :-
|
||||
FutY = future((func) = Y0 :- M(X, Y0)),
|
||||
my_map_par_future(M, Xs, Ys0),
|
||||
Y = wait(FutY),
|
||||
Ys = [Y | Ys0].
|
||||
|
||||
:- pred my_map_par_spawn(pred(X, Y), list(X), list(Y)).
|
||||
:- mode my_map_par_spawn(pred(in, out) is det, in, out) is cc_multi.
|
||||
|
||||
|
||||
@@ -301,6 +301,7 @@ mercury_std_library_module("time").
|
||||
mercury_std_library_module("thread").
|
||||
mercury_std_library_module("thread.barrier").
|
||||
mercury_std_library_module("thread.channel").
|
||||
mercury_std_library_module("thread.future").
|
||||
mercury_std_library_module("thread.mvar").
|
||||
mercury_std_library_module("thread.semaphore").
|
||||
mercury_std_library_module("tree234").
|
||||
|
||||
281
library/thread.future.m
Normal file
281
library/thread.future.m
Normal file
@@ -0,0 +1,281 @@
|
||||
%-----------------------------------------------------------------------------%
|
||||
% vim: ft=mercury ts=4 sw=4 et
|
||||
%-----------------------------------------------------------------------------%
|
||||
% Copyright (C) 2014 The Mercury Team.
|
||||
% This file may only be copied under the terms of the GNU Library General
|
||||
% Public License - see the file COPYING.LIB in the Mercury distribution.
|
||||
%-----------------------------------------------------------------------------%
|
||||
%
|
||||
% File: thread.future.m.
|
||||
% Authors: pbone.
|
||||
% Stability: low.
|
||||
%
|
||||
% This module defines the data types future_io/1 and future/1 which are
|
||||
% useful for parallel and concurrent programming.
|
||||
%
|
||||
% A future represents a value that might not exist yet. A value for a
|
||||
% future may be provided exactly once, but can be read any number of times.
|
||||
% In these situations futures can be faster than mvars as their
|
||||
% implementation is simpler: they need only one semaphore and they can avoid
|
||||
% using it in some cases.
|
||||
%
|
||||
% There are two kinds of futures:
|
||||
%
|
||||
% + future(T) is a value that will be evaluated by another thread. The
|
||||
% function future/1 will spawn a new thread to evaluate its argument
|
||||
% whose result can be retrieved later by calling the function wait/1.
|
||||
% For example:
|
||||
%
|
||||
% Future = future(SomeFunction),
|
||||
% ... do something in the meantime ...
|
||||
% Value = wait(Future).
|
||||
%
|
||||
% + future_io(T) provides more flexibility, allowing the caller to control
|
||||
% the creation of the thread that provides its value. It can be used
|
||||
% as follows:
|
||||
%
|
||||
% First:
|
||||
% future(Future, !IO),
|
||||
%
|
||||
% Then in a separate thread:
|
||||
% signal(Future, Value0, !IO),
|
||||
%
|
||||
% Finally, in the original thread:
|
||||
% wait(Future, Value, !IO),
|
||||
%
|
||||
% This is more flexible because the thread can be used to signal
|
||||
% multiple futures or do other things, but it requires the I/O state.
|
||||
%
|
||||
%-----------------------------------------------------------------------------%
|
||||
%-----------------------------------------------------------------------------%
|
||||
|
||||
:- module thread.future.
|
||||
:- interface.
|
||||
|
||||
%-----------------------------------------------------------------------------%
|
||||
|
||||
% future/1 represents a value that will be computed by another thread.
|
||||
%
|
||||
:- type future(T).
|
||||
|
||||
% Create a future which has the value that the argument, when evaluated,
|
||||
% will produce. This function will create a thread to evaluate the
|
||||
% argument using spawn/3.
|
||||
%
|
||||
% If the argument throws an exception, that exception will be rethrown by
|
||||
% wait/1.
|
||||
%
|
||||
:- func future((func) = T) = future(T).
|
||||
|
||||
% Return the value of the future, blocking until the value is available.
|
||||
%
|
||||
:- func wait(future(T)) = T.
|
||||
|
||||
%-----------------------------------------------------------------------------%
|
||||
|
||||
% future_io/1 represents a value that may not have been computed yet.
|
||||
% Future values are intended to be computed by separate threads (using
|
||||
% spawn/3).
|
||||
%
|
||||
% Generally in computer science and in some other languages this is
|
||||
% known as a promise. We called it future_io because promise is a
|
||||
% reserved word in Mercury.
|
||||
%
|
||||
:- type future_io(T).
|
||||
|
||||
% Create a new empty future_io.
|
||||
%
|
||||
:- pred init(future_io(T)::uo, io::di, io::uo) is det.
|
||||
|
||||
% Provide a value for the future_io and signal any waiting threads. Any
|
||||
% further calls to wait will return immediately.
|
||||
%
|
||||
% Calling signal multiple times will result in undefined behaviour.
|
||||
%
|
||||
:- pred signal(future_io(T)::in, T::in, io::di, io::uo) is det.
|
||||
|
||||
% Return the future_io's value, potentially blocking until it is
|
||||
% signaled.
|
||||
%
|
||||
:- pred wait(future_io(T)::in, T::out, io::di, io::uo) is det.
|
||||
|
||||
%-----------------------------------------------------------------------------%
|
||||
%-----------------------------------------------------------------------------%
|
||||
:- implementation.
|
||||
|
||||
:- import_module exception.
|
||||
:- import_module thread.semaphore.
|
||||
:- import_module mutvar.
|
||||
|
||||
%-----------------------------------------------------------------------------%
|
||||
|
||||
:- type future(T)
|
||||
---> future(future_io(ok_or_exception(T))).
|
||||
|
||||
:- type ok_or_exception(T)
|
||||
---> ok(T)
|
||||
; some [E] exception(E).
|
||||
|
||||
:- pragma promise_pure(future/1).
|
||||
|
||||
future(Func) = Future :-
|
||||
impure init(FutureIO),
|
||||
Future = future(FutureIO),
|
||||
impure spawn_impure(run_future(Future, Func)).
|
||||
|
||||
:- impure pred run_future(future(T), (func) = T).
|
||||
:- mode run_future(in, ((func) = out) is det) is cc_multi.
|
||||
|
||||
run_future(future(Future), Func) :-
|
||||
( try []
|
||||
Result = apply(Func)
|
||||
then
|
||||
impure signal(Future, ok(Result))
|
||||
catch_any Exp ->
|
||||
impure signal(Future, 'new exception'(Exp))
|
||||
).
|
||||
|
||||
wait(future(Future)) = Value :-
|
||||
wait(Future, Result),
|
||||
(
|
||||
Result = ok(Value)
|
||||
;
|
||||
Result = exception(Exception),
|
||||
throw(Exception)
|
||||
).
|
||||
|
||||
:- impure pred spawn_impure(impure (pred)).
|
||||
:- mode spawn_impure((pred) is cc_multi) is det.
|
||||
|
||||
spawn_impure(Task) :-
|
||||
impure make_io_state(IO0),
|
||||
promise_equivalent_solutions [IO] (
|
||||
spawn(spawn_impure_2(Task), IO0, IO)
|
||||
),
|
||||
impure consume_io_state(IO).
|
||||
|
||||
:- pred spawn_impure_2(impure (pred), io, io).
|
||||
:- mode spawn_impure_2((pred) is cc_multi, di, uo) is cc_multi.
|
||||
:- pragma promise_pure(spawn_impure_2/3).
|
||||
|
||||
spawn_impure_2(Task, !IO) :-
|
||||
impure Task.
|
||||
|
||||
%-----------------------------------------------------------------------------%
|
||||
%-----------------------------------------------------------------------------%
|
||||
|
||||
:- type future_io(T)
|
||||
---> future_io(
|
||||
f_ready :: mutvar(ready),
|
||||
% f_ready is used to optimistically avoid locking. It is
|
||||
% also used to try to detect multiple calls to signal/2.
|
||||
|
||||
f_wait :: semaphore,
|
||||
f_value :: mutvar(T)
|
||||
).
|
||||
|
||||
:- type ready
|
||||
---> ready
|
||||
; not_ready.
|
||||
|
||||
:- pragma promise_pure(init/3).
|
||||
|
||||
init(Future, !IO) :-
|
||||
impure init(Future).
|
||||
|
||||
:- impure pred init(future_io(T)::uo) is det.
|
||||
|
||||
init(future_io(Ready, Wait, Value)) :-
|
||||
impure new_mutvar(not_ready, Ready),
|
||||
impure semaphore.impure_init(Wait),
|
||||
impure new_mutvar0(Value).
|
||||
|
||||
%-----------------------------------------------------------------------------%
|
||||
|
||||
:- pragma promise_pure(signal/4).
|
||||
|
||||
signal(Future, Value, !IO) :-
|
||||
impure signal(Future, Value).
|
||||
|
||||
:- impure pred signal(future_io(T)::in, T::in) is det.
|
||||
|
||||
signal(future_io(MReady, Wait, MValue), Value) :-
|
||||
impure get_mutvar(MReady, Ready),
|
||||
(
|
||||
Ready = not_ready,
|
||||
impure set_mutvar(MValue, Value),
|
||||
% TODO: Implement signal_all.
|
||||
impure semaphore.impure_signal(Wait),
|
||||
% We must write MReady _after_ signaling the semaphore. The signal
|
||||
% provides a memory barrier that ensures that the write to MReady
|
||||
% occurs after MValue. This ensures that the optimisation in wait/4
|
||||
% will read the future consistently.
|
||||
impure set_mutvar(MReady, ready)
|
||||
;
|
||||
Ready = ready,
|
||||
% It is possible that another thread has called signal/2 but we read
|
||||
% Ready before it wrote it, resulting in multiple calls to signal/2.
|
||||
% Therefore we do not guarantee that we will always detect multiple
|
||||
% calls and will not always throw this exception.
|
||||
error("Multiple calls to thread.future.signal/2")
|
||||
).
|
||||
|
||||
%-----------------------------------------------------------------------------%
|
||||
|
||||
wait(Future, Value, !IO) :-
|
||||
wait(Future, Value).
|
||||
|
||||
% Wait is pure because it always returns the same value for the same
|
||||
% future (if it terminates).
|
||||
%
|
||||
:- pred wait(future_io(T)::in, T::out) is det.
|
||||
:- pragma promise_pure(wait/2).
|
||||
|
||||
wait(Future, Value) :-
|
||||
Future = future_io(MReady, Wait, MValue),
|
||||
impure get_mutvar(MReady, Ready),
|
||||
(
|
||||
Ready = ready
|
||||
% No wait necessary
|
||||
;
|
||||
Ready = not_ready,
|
||||
% We need to wait, this will probably block.
|
||||
impure semaphore.impure_wait(Wait),
|
||||
% Signal the semaphore to release the next waiting thread.
|
||||
impure semaphore.impure_signal(Wait)
|
||||
),
|
||||
impure get_mutvar(MValue, Value).
|
||||
|
||||
%-----------------------------------------------------------------------------%
|
||||
%-----------------------------------------------------------------------------%
|
||||
|
||||
%
|
||||
% Copied from exception.m
|
||||
%
|
||||
|
||||
:- impure pred make_io_state(io::uo) is det.
|
||||
:- pragma foreign_proc("C", make_io_state(_IO::uo),
|
||||
[will_not_call_mercury, thread_safe, will_not_modify_trail, no_sharing],
|
||||
"").
|
||||
:- pragma foreign_proc("C#", make_io_state(_IO::uo),
|
||||
[will_not_call_mercury, thread_safe], "").
|
||||
:- pragma foreign_proc("Java", make_io_state(_IO::uo),
|
||||
[will_not_call_mercury, thread_safe], "").
|
||||
:- pragma foreign_proc("Erlang", make_io_state(_IO::uo),
|
||||
[will_not_call_mercury, thread_safe], "void").
|
||||
|
||||
:- impure pred consume_io_state(io::di) is det.
|
||||
:- pragma foreign_proc("C",
|
||||
consume_io_state(_IO::di),
|
||||
[will_not_call_mercury, thread_safe, no_sharing], "").
|
||||
:- pragma foreign_proc("C#",
|
||||
consume_io_state(_IO::di),
|
||||
[will_not_call_mercury, thread_safe], "").
|
||||
:- pragma foreign_proc("Java",
|
||||
consume_io_state(_IO::di),
|
||||
[will_not_call_mercury, thread_safe], "").
|
||||
:- pragma foreign_proc("Erlang",
|
||||
consume_io_state(_IO::di),
|
||||
[will_not_call_mercury, thread_safe], "void").
|
||||
|
||||
%-----------------------------------------------------------------------------%
|
||||
@@ -1,8 +1,9 @@
|
||||
%-----------------------------------------------------------------------------%
|
||||
% vim: ft=mercury ts=4 sw=4 et
|
||||
%-----------------------------------------------------------------------------%
|
||||
% Copyright (C) 2000-2001, 2003-2004, 2006-2008, 2010-2011, 2014 The
|
||||
% University of Melbourne.
|
||||
% Copyright (C) 2000-2001, 2003-2004, 2006-2008, 2010-2011 The University
|
||||
% of Melbourne.
|
||||
% Copyright (C) 2014 The Mercury Team.
|
||||
% This file may only be copied under the terms of the GNU Library General
|
||||
% Public License - see the file COPYING.LIB in the Mercury distribution.
|
||||
%-----------------------------------------------------------------------------%
|
||||
@@ -28,6 +29,7 @@
|
||||
|
||||
:- include_module barrier.
|
||||
:- include_module channel.
|
||||
:- include_module future.
|
||||
:- include_module mvar.
|
||||
:- include_module semaphore.
|
||||
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
% vim: ft=mercury ts=4 sw=4 et
|
||||
%-----------------------------------------------------------------------------%
|
||||
% Copyright (C) 2000-2003, 2006-2007, 2011 The University of Melbourne.
|
||||
% Copyright (C) 2014 The Mercury Team.
|
||||
% This file may only be copied under the terms of the GNU Library General
|
||||
% Public License - see the file COPYING.LIB in the Mercury distribution.
|
||||
%-----------------------------------------------------------------------------%
|
||||
@@ -90,8 +91,8 @@ mvar.init(Mvar, !IO) :-
|
||||
).
|
||||
|
||||
mvar.init = mvar(Full, Empty, Ref) :-
|
||||
impure Full = semaphore.init(0),
|
||||
impure Empty = semaphore.init(1), % Initially a mvar starts empty.
|
||||
impure semaphore.impure_init(0, Full),
|
||||
impure semaphore.impure_init(1, Empty), % Initially a mvar starts empty.
|
||||
impure new_mutvar0(Ref).
|
||||
|
||||
mvar.take(mvar(Full, Empty, Ref), Data, !IO) :-
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
% vim: ft=mercury ts=4 sw=4 et
|
||||
%-----------------------------------------------------------------------------%
|
||||
% Copyright (C) 2000-2001,2003-2004, 2006-2007, 2009-2011 The University of Melbourne.
|
||||
% Copyright (C) 2014 The Mercury Team.
|
||||
% This file may only be copied under the terms of the GNU Library General
|
||||
% Public License - see the file COPYING.LIB in the Mercury distribution.
|
||||
%-----------------------------------------------------------------------------%
|
||||
@@ -11,7 +12,7 @@
|
||||
% Stability: medium.
|
||||
%
|
||||
% This module implements a simple semaphore data type for allowing
|
||||
% coroutines to synchronise with one another.
|
||||
% threads to synchronise with one another.
|
||||
%
|
||||
% The operations in this module are no-ops in the hlc grades that do not
|
||||
% contain a .par component.
|
||||
@@ -28,40 +29,103 @@
|
||||
|
||||
:- type semaphore.
|
||||
|
||||
% init(Count, Sem, !IO) creates a new semaphore `Sem' with its counter
|
||||
% initialized to `Count'.
|
||||
%
|
||||
:- pred init(int::in, semaphore::uo, io::di, io::uo) is det.
|
||||
|
||||
% init(Sem, !IO) creates a new semaphore `Sem' with its counter
|
||||
% initialized to 0.
|
||||
%
|
||||
:- pred semaphore.init(semaphore::out, io::di, io::uo) is det.
|
||||
:- pred init(semaphore::uo, io::di, io::uo) is det.
|
||||
|
||||
% Returns a new semaphore `Sem' with its counter initialized to Count.
|
||||
% Sem = init(Count) returns a new semaphore `Sem' with its counter
|
||||
% initialized to `Count'.
|
||||
%
|
||||
:- impure func semaphore.init(int::in) = (semaphore::uo) is det.
|
||||
:- impure func impure_init(int::in) = (semaphore::uo) is det.
|
||||
|
||||
% Sem = init(Count) returns a new semaphore `Sem' with its counter
|
||||
% initialized to `Count'.
|
||||
%
|
||||
% This has been renamed to impure_init.
|
||||
%
|
||||
:- impure func init(int::in) = (semaphore::uo) is det.
|
||||
:- pragma obsolete(init/1).
|
||||
|
||||
% wait(Sem, !IO) blocks until the counter associated with `Sem'
|
||||
% becomes greater than 0, whereupon it wakes, decrements the
|
||||
% counter and returns.
|
||||
%
|
||||
:- pred semaphore.wait(semaphore::in, io::di, io::uo) is det.
|
||||
:- pred wait(semaphore::in, io::di, io::uo) is det.
|
||||
|
||||
% try_wait(Sem, Succ, !IO) is the same as wait/3, except that
|
||||
% instead of blocking, it binds `Succ' to a boolean indicating
|
||||
% whether the call succeeded in obtaining the semaphore or not.
|
||||
%
|
||||
:- pred semaphore.try_wait(semaphore::in, bool::out, io::di, io::uo) is det.
|
||||
:- pred try_wait(semaphore::in, bool::out, io::di, io::uo) is det.
|
||||
|
||||
% signal(Sem, !IO) increments the counter associated with `Sem'
|
||||
% and if the resulting counter has a value greater than 0, it wakes
|
||||
% one or more coroutines that are waiting on this semaphore (if
|
||||
% one or more threads that are waiting on this semaphore (if
|
||||
% any).
|
||||
%
|
||||
:- pred semaphore.signal(semaphore::in, io::di, io::uo) is det.
|
||||
:- pred signal(semaphore::in, io::di, io::uo) is det.
|
||||
|
||||
%-----------------------------------------------------------------------------%
|
||||
%-----------------------------------------------------------------------------%
|
||||
|
||||
:- implementation.
|
||||
|
||||
init(Count, Semaphore, !IO) :-
|
||||
promise_pure (
|
||||
impure impure_init(Count, Semaphore)
|
||||
).
|
||||
|
||||
init(Semaphore, !IO) :-
|
||||
init(0, Semaphore, !IO).
|
||||
|
||||
impure_init(Count) = Semaphore :-
|
||||
impure impure_init(Count, Semaphore).
|
||||
|
||||
init(Count) = Semaphore :-
|
||||
impure impure_init(Count, Semaphore).
|
||||
|
||||
signal(Semaphore, !IO) :-
|
||||
promise_pure (
|
||||
impure impure_signal(Semaphore),
|
||||
!:IO = !.IO
|
||||
).
|
||||
|
||||
wait(Semaphore, !IO) :-
|
||||
promise_pure (
|
||||
impure impure_wait(Semaphore),
|
||||
!:IO = !.IO
|
||||
).
|
||||
|
||||
try_wait(Sem, Res, !IO) :-
|
||||
promise_pure (
|
||||
impure impure_try_wait(Sem, Res)
|
||||
).
|
||||
|
||||
%-----------------------------------------------------------------------------%
|
||||
%-----------------------------------------------------------------------------%
|
||||
:- interface.
|
||||
|
||||
% The semaphore operations above can be used without the I/O state in impure
|
||||
% code. These predicates are provided for use by implementors.
|
||||
|
||||
:- impure pred impure_init(int::in, semaphore::uo) is det.
|
||||
|
||||
:- impure pred impure_init(semaphore::uo) is det.
|
||||
|
||||
:- impure pred impure_wait(semaphore::in) is det.
|
||||
|
||||
:- impure pred impure_try_wait(semaphore::in, bool::out) is det.
|
||||
|
||||
:- impure pred impure_signal(semaphore::in) is det.
|
||||
|
||||
%-----------------------------------------------------------------------------%
|
||||
%-----------------------------------------------------------------------------%
|
||||
:- implementation.
|
||||
|
||||
:- pragma foreign_decl("C", "
|
||||
#include <stdio.h>
|
||||
@@ -106,13 +170,11 @@ ML_finalize_semaphore(void *obj, void *cd);
|
||||
|
||||
%-----------------------------------------------------------------------------%
|
||||
|
||||
init(Semaphore, !IO) :-
|
||||
promise_pure (
|
||||
impure Semaphore = init(0)
|
||||
).
|
||||
impure_init(Semaphore) :-
|
||||
impure impure_init(0, Semaphore).
|
||||
|
||||
:- pragma foreign_proc("C",
|
||||
init(Count::in) = (Semaphore::uo),
|
||||
impure_init(Count::in, Semaphore::uo),
|
||||
[will_not_call_mercury, thread_safe],
|
||||
"
|
||||
ML_Semaphore *sem;
|
||||
@@ -142,7 +204,7 @@ init(Semaphore, !IO) :-
|
||||
").
|
||||
|
||||
:- pragma foreign_proc("C#",
|
||||
init(Count::in) = (Semaphore::uo),
|
||||
impure_init(Count::in, Semaphore::uo),
|
||||
[will_not_call_mercury, thread_safe],
|
||||
"
|
||||
Semaphore = new thread__semaphore.ML_Semaphore();
|
||||
@@ -150,7 +212,7 @@ init(Semaphore, !IO) :-
|
||||
").
|
||||
|
||||
:- pragma foreign_proc("Java",
|
||||
init(Count::in) = (Semaphore::uo),
|
||||
impure_init(Count::in, Semaphore::uo),
|
||||
[will_not_call_mercury, thread_safe],
|
||||
"
|
||||
Semaphore = new jmercury.runtime.Semaphore(Count);
|
||||
@@ -174,16 +236,18 @@ ML_finalize_semaphore(void *obj, void *cd)
|
||||
}
|
||||
").
|
||||
|
||||
% semaphore.signal causes the calling context to resume in semaphore.nop,
|
||||
%-----------------------------------------------------------------------------%
|
||||
|
||||
% impure_signal causes the calling context to resume in semaphore.nop,
|
||||
% which simply jumps to the succip. That will return control to the caller
|
||||
% of semaphore.signal as intended, but not if this procedure is inlined.
|
||||
%
|
||||
% XXX get rid of this limitation at some stage.
|
||||
%
|
||||
:- pragma no_inline(semaphore.signal/3).
|
||||
:- pragma no_inline(semaphore.impure_signal/1).
|
||||
:- pragma foreign_proc("C",
|
||||
signal(Semaphore::in, _IO0::di, _IO::uo),
|
||||
[promise_pure, will_not_call_mercury, thread_safe, tabled_for_io],
|
||||
impure_signal(Semaphore::in),
|
||||
[will_not_call_mercury, thread_safe],
|
||||
"
|
||||
ML_Semaphore *sem;
|
||||
#ifndef MR_HIGHLEVEL_CODE
|
||||
@@ -254,8 +318,8 @@ ML_finalize_semaphore(void *obj, void *cd)
|
||||
").
|
||||
|
||||
:- pragma foreign_proc("C#",
|
||||
signal(Semaphore::in, _IO0::di, _IO::uo),
|
||||
[promise_pure, will_not_call_mercury, thread_safe, tabled_for_io],
|
||||
impure_signal(Semaphore::in),
|
||||
[will_not_call_mercury, thread_safe],
|
||||
"
|
||||
System.Threading.Monitor.Enter(Semaphore);
|
||||
Semaphore.count++;
|
||||
@@ -265,22 +329,24 @@ ML_finalize_semaphore(void *obj, void *cd)
|
||||
").
|
||||
|
||||
:- pragma foreign_proc("Java",
|
||||
signal(Semaphore::in, _IO0::di, _IO::uo),
|
||||
[promise_pure, will_not_call_mercury, thread_safe, tabled_for_io],
|
||||
impure_signal(Semaphore::in),
|
||||
[will_not_call_mercury, thread_safe],
|
||||
"
|
||||
Semaphore.release();
|
||||
").
|
||||
|
||||
% semaphore.wait causes the calling context to resume in semaphore.nop,
|
||||
%-----------------------------------------------------------------------------%
|
||||
|
||||
% impure_wait causes the calling context to resume in semaphore.nop,
|
||||
% which simply jumps to the succip. That will return control to the caller
|
||||
% of semaphore.wait as intended, but not if this procedure is inlined.
|
||||
%
|
||||
% XXX get rid of this limitation at some stage.
|
||||
%
|
||||
:- pragma no_inline(semaphore.wait/3).
|
||||
:- pragma no_inline(impure_wait/1).
|
||||
:- pragma foreign_proc("C",
|
||||
wait(Semaphore::in, _IO0::di, _IO::uo),
|
||||
[promise_pure, will_not_call_mercury, thread_safe, tabled_for_io],
|
||||
impure_wait(Semaphore::in),
|
||||
[will_not_call_mercury, thread_safe],
|
||||
"
|
||||
ML_Semaphore *sem;
|
||||
#ifndef MR_HIGHLEVEL_CODE
|
||||
@@ -342,8 +408,8 @@ ML_finalize_semaphore(void *obj, void *cd)
|
||||
").
|
||||
|
||||
:- pragma foreign_proc("C#",
|
||||
wait(Semaphore::in, _IO0::di, _IO::uo),
|
||||
[promise_pure, will_not_call_mercury, thread_safe, tabled_for_io],
|
||||
impure_wait(Semaphore::in),
|
||||
[will_not_call_mercury, thread_safe],
|
||||
"
|
||||
System.Threading.Monitor.Enter(Semaphore);
|
||||
|
||||
@@ -357,8 +423,8 @@ ML_finalize_semaphore(void *obj, void *cd)
|
||||
").
|
||||
|
||||
:- pragma foreign_proc("Java",
|
||||
wait(Semaphore::in, _IO0::di, _IO::uo),
|
||||
[promise_pure, will_not_call_mercury, thread_safe, tabled_for_io],
|
||||
impure_wait(Semaphore::in),
|
||||
[will_not_call_mercury, thread_safe],
|
||||
"
|
||||
/*
|
||||
** acquire() might be useful as well; it will throw an exception if the
|
||||
@@ -367,15 +433,17 @@ ML_finalize_semaphore(void *obj, void *cd)
|
||||
Semaphore.acquireUninterruptibly();
|
||||
").
|
||||
|
||||
semaphore.try_wait(Sem, Res, !IO) :-
|
||||
try_wait_2(Sem, Res0, !IO),
|
||||
%-----------------------------------------------------------------------------%
|
||||
|
||||
impure_try_wait(Sem, Res) :-
|
||||
impure impure_try_wait_2(Sem, Res0),
|
||||
Res = ( Res0 = 0 -> yes ; no ).
|
||||
|
||||
:- pred try_wait_2(semaphore::in, int::out, io::di, io::uo) is det.
|
||||
:- impure pred impure_try_wait_2(semaphore::in, int::out) is det.
|
||||
|
||||
:- pragma foreign_proc("C",
|
||||
try_wait_2(Semaphore::in, Res::out, _IO0::di, _IO::uo),
|
||||
[promise_pure, will_not_call_mercury, thread_safe, tabled_for_io],
|
||||
impure_try_wait_2(Semaphore::in, Res::out),
|
||||
[will_not_call_mercury, thread_safe],
|
||||
"
|
||||
ML_Semaphore *sem;
|
||||
|
||||
@@ -393,8 +461,8 @@ semaphore.try_wait(Sem, Res, !IO) :-
|
||||
").
|
||||
|
||||
:- pragma foreign_proc("C#",
|
||||
try_wait_2(Semaphore::in, Res::out, _IO0::di, _IO::uo),
|
||||
[promise_pure, will_not_call_mercury, thread_safe, tabled_for_io],
|
||||
impure_try_wait_2(Semaphore::in, Res::out),
|
||||
[will_not_call_mercury, thread_safe],
|
||||
"
|
||||
if (System.Threading.Monitor.TryEnter(Semaphore)) {
|
||||
if (Semaphore.count > 0) {
|
||||
@@ -411,8 +479,8 @@ semaphore.try_wait(Sem, Res, !IO) :-
|
||||
").
|
||||
|
||||
:- pragma foreign_proc("Java",
|
||||
try_wait_2(Semaphore::in, Res::out, _IO0::di, _IO::uo),
|
||||
[promise_pure, will_not_call_mercury, thread_safe, tabled_for_io],
|
||||
impure_try_wait_2(Semaphore::in, Res::out),
|
||||
[will_not_call_mercury, thread_safe],
|
||||
"
|
||||
Res = Semaphore.tryAcquire() ? 0 : 1;
|
||||
").
|
||||
|
||||
Reference in New Issue
Block a user