From 36e1e26b33e14feffb8127ea8112d53b0c4c94d5 Mon Sep 17 00:00:00 2001 From: Peter Wang Date: Tue, 18 Feb 2020 11:07:34 +1100 Subject: [PATCH] Add test for thread.closeable_channel. tests/hard_coded/Mmakefile: tests/hard_coded/closeable_channel_test.exp: tests/hard_coded/closeable_channel_test.exp2: tests/hard_coded/closeable_channel_test.m: Add new test. --- tests/hard_coded/Mmakefile | 1 + tests/hard_coded/closeable_channel_test.exp | 33 +++ tests/hard_coded/closeable_channel_test.exp2 | 1 + tests/hard_coded/closeable_channel_test.m | 246 +++++++++++++++++++ 4 files changed, 281 insertions(+) create mode 100644 tests/hard_coded/closeable_channel_test.exp create mode 100644 tests/hard_coded/closeable_channel_test.exp2 create mode 100644 tests/hard_coded/closeable_channel_test.m diff --git a/tests/hard_coded/Mmakefile b/tests/hard_coded/Mmakefile index 1a45d7e62..d3644353d 100644 --- a/tests/hard_coded/Mmakefile +++ b/tests/hard_coded/Mmakefile @@ -57,6 +57,7 @@ ORDINARY_PROGS = \ char_unicode \ checked_nondet_tailcall \ checked_nondet_tailcall_noinline \ + closeable_channel_test \ closure_extension \ common_type_cast \ compare_spec \ diff --git a/tests/hard_coded/closeable_channel_test.exp b/tests/hard_coded/closeable_channel_test.exp new file mode 100644 index 000000000..3f8888ffb --- /dev/null +++ b/tests/hard_coded/closeable_channel_test.exp @@ -0,0 +1,33 @@ +main: starting worker threads +main: try_take... +main: try_take - empty channel +main: put 1... +main: put 1 +main: put 2... +main: put 2 +main: put 3... +main: put 3 +main: put 4... +main: put 4 +main: put 5... +main: put 5 +main: put 6... +main: put 6 +main: put 7... +main: put 7 +main: put 8... +main: put 8 +main: put 9... +main: put 9 +main: closing channel... +main: closed channel +main: waiting for worker threads to exit +main: workers exited +main: put 999... +main: put 999 - channel closed +main: closing channel... +main: closed channel +main: closing channel... +main: closed channel +outputs (a) = [1, 4, 9, 16, 25, 36, 49, 64, 81] +outputs (b) = [] diff --git a/tests/hard_coded/closeable_channel_test.exp2 b/tests/hard_coded/closeable_channel_test.exp2 new file mode 100644 index 000000000..85f8c0c08 --- /dev/null +++ b/tests/hard_coded/closeable_channel_test.exp2 @@ -0,0 +1 @@ +spawn/3 not supported in this grade diff --git a/tests/hard_coded/closeable_channel_test.m b/tests/hard_coded/closeable_channel_test.m new file mode 100644 index 000000000..cf747c7e0 --- /dev/null +++ b/tests/hard_coded/closeable_channel_test.m @@ -0,0 +1,246 @@ +%---------------------------------------------------------------------------% +% vim: ft=mercury ts=4 sw=4 et +%---------------------------------------------------------------------------% + +:- module closeable_channel_test. +:- interface. + +:- import_module io. + +:- pred main(io::di, io::uo) is cc_multi. + +%---------------------------------------------------------------------------% +%---------------------------------------------------------------------------% + +:- implementation. + +:- import_module bool. +:- import_module int. +:- import_module list. +:- import_module maybe. +:- import_module string. +:- import_module thread. +:- import_module thread.closeable_channel. +:- import_module thread.semaphore. + +%---------------------------------------------------------------------------% + +main(!IO) :- + ( if can_spawn then + run_test(!IO) + else + io.write_string("spawn/3 not supported in this grade\n", !IO) + ). + +:- pred run_test(io::di, io::uo) is cc_multi. + +run_test(!IO) :- + % Output from worker threads is disabled by default as it is + % non-deterministic. + io.get_environment_var("VERBOSE", Verbose, !IO), + ( + Verbose = yes(_), + LogA = log_on("huey"), + LogB = log_on("dewey"), + LogC = log_on("louie") + ; + Verbose = no, + LogA = log_off, + LogB = log_off, + LogC = log_off + ), + + closeable_channel.init(InCh, !IO), + closeable_channel.init(OutCh, !IO), + + semaphore.init(DoneA, !IO), + semaphore.init(DoneB, !IO), + semaphore.init(DoneC, !IO), + DoneSems = [DoneA, DoneB, DoneC], + + Log = log_on("main"), + log(Log, "starting worker threads", !IO), + + thread.spawn(thread_proc(LogA, InCh, OutCh, DoneA), !IO), + thread.spawn(thread_proc(LogB, InCh, OutCh, DoneB), !IO), + thread.spawn(thread_proc(LogC, InCh, OutCh, DoneC), !IO), + + % Test try_take from open and empty channel. + trace_try_take(Log, OutCh, _, !IO), + + % Write to open channel. + Inputs = 1 .. 9, + list.foldl(trace_put(Log, InCh), Inputs, !IO), + + % Close open channel. + trace_close(Log, InCh, !IO), + + % Check is_closed. + is_closed(InCh, IsClosed, !IO), + ( + IsClosed = yes + ; + IsClosed = no, + log(Log, "ERROR: channel not closed", !IO) + ), + + % Wait for threads to exit. + log(Log, "waiting for worker threads to exit", !IO), + list.foldl(semaphore.wait, DoneSems, !IO), + log(Log, "workers exited", !IO), + + % Write to closed channel. + trace_put(Log, InCh, 999, !IO), + + % Close output channel. + trace_close(Log, OutCh, !IO), + + % Read from closed channel. + channel_to_list(OutCh, OutputsA0, !IO), + sort(OutputsA0, OutputsA), + + % Closing a channel again has no effect. + trace_close(Log, OutCh, !IO), + + % Read from empty closed channel does not block. + channel_to_list(OutCh, OutputsB, !IO), + + io.write_string("outputs (a) = ", !IO), + io.write(OutputsA, !IO), + io.nl(!IO), + io.write_string("outputs (b) = ", !IO), + io.write(OutputsB, !IO), + io.nl(!IO). + +:- pred thread_proc(log::in, closeable_channel(int)::in, + closeable_channel(int)::in, semaphore::in, io::di, io::uo) is cc_multi. + +thread_proc(Log, InCh, OutCh, DoneSem, !IO) :- + log(Log, "thread start", !IO), + thread_loop(Log, InCh, OutCh, !IO), + log(Log, "thread exit", !IO), + semaphore.signal(DoneSem, !IO). + +:- pred thread_loop(log::in, closeable_channel(int)::in, + closeable_channel(int)::in, io::di, io::uo) is cc_multi. + +thread_loop(Log, InCh, OutCh, !IO) :- + trace_take(Log, InCh, TakeResult, !IO), + ( + TakeResult = ok(Input), + Output = Input * Input, + trace_put(Log, OutCh, Output, !IO), + thread_loop(Log, InCh, OutCh, !IO) + ; + TakeResult = closed + ). + +:- pred channel_to_list(closeable_channel(int)::in, list(int)::out, + io::di, io::uo) is det. + +channel_to_list(Ch, List, !IO) :- + closeable_channel.take(Ch, TakeResult, !IO), + ( + TakeResult = ok(Head), + channel_to_list(Ch, Tail, !IO), + List = [Head | Tail] + ; + TakeResult = closed, + List = [] + ). + +%---------------------------------------------------------------------------% + +:- pred trace_close(log::in, closeable_channel(int)::in, io::di, io::uo) + is det. + +trace_close(Log, Ch, !IO) :- + log(Log, "closing channel...", !IO), + closeable_channel.close(Ch, !IO), + log(Log, "closed channel", !IO). + +:- pred trace_put(log::in, closeable_channel(int)::in, int::in, + io::di, io::uo) is det. + +trace_put(Log, Ch, Item, !IO) :- + logf(Log, "put %d...", [i(Item)], !IO), + closeable_channel.put(Ch, Item, Success, !IO), + ( + Success = yes, + logf(Log, "put %d", [i(Item)], !IO) + ; + Success = no, + logf(Log, "put %d - channel closed", [i(Item)], !IO) + ). + +:- pred trace_take(log::in, closeable_channel(int)::in, + take_result(int)::out, io::di, io::uo) is det. + +trace_take(Log, Ch, TakeResult, !IO) :- + log(Log, "take...", !IO), + closeable_channel.take(Ch, TakeResult, !IO), + ( + TakeResult = ok(Item), + logf(Log, "take - got %d", [i(Item)], !IO) + ; + TakeResult = closed, + log(Log, "take - channel closed", !IO) + ). + +:- pred trace_try_take(log::in, closeable_channel(int)::in, + try_take_result(int)::out, io::di, io::uo) is det. + +trace_try_take(Log, Ch, TakeResult, !IO) :- + log(Log, "try_take...", !IO), + closeable_channel.try_take(Ch, TakeResult, !IO), + ( + TakeResult = ok(Item), + logf(Log, "try_take - got %d", [i(Item)], !IO) + ; + TakeResult = closed, + log(Log, "try_take - channel closed", !IO) + ; + TakeResult = empty, + log(Log, "try_take - empty channel", !IO) + ). + +%---------------------------------------------------------------------------% + +:- mutable(log_lock, semaphore, semaphore.impure_init(1), ground, + [untrailed, constant]). + +:- type log + ---> log_on(string) + ; log_off. + +:- pred log(log::in, string::in, io::di, io::uo) is det. + +log(Log, Str, !IO) :- + ( + Log = log_on(Label), + write_log(Label, Str, !IO) + ; + Log = log_off + ). + +:- pred logf(log::in, string::in, list(poly_type)::in, + io::di, io::uo) is det. + +logf(Log, Format, Args, !IO) :- + ( + Log = log_on(Label), + write_log(Label, string.format(Format, Args), !IO) + ; + Log = log_off + ). + +:- pred write_log(string::in, string::in, io::di, io::uo) is det. + +write_log(Label, Str, !IO) :- + get_log_lock(Lock), + semaphore.wait(Lock, !IO), + io.write_string(Label, !IO), + io.write_string(": ", !IO), + io.write_string(Str, !IO), + io.nl(!IO), + semaphore.signal(Lock, !IO).