diff --git a/src/alcove_drv.erl b/src/alcove_drv.erl index d5062ba..c17a754 100644 --- a/src/alcove_drv.erl +++ b/src/alcove_drv.erl @@ -18,7 +18,8 @@ -export([start/0, start/1, stop/1]). -export([call/2, call/3, call/4, cast/2, encode/2, encode/3]). -export([stdin/3, stdout/3, stderr/3, event/4]). --export([msg/2]). +-export([atom_to_type/1, type_to_atom/1]). +-export([msg/2, events/4]). -export([getopts/1]). -export_type([reply/0]). @@ -59,10 +60,24 @@ cast(Port, Data) -> send(Port, Data, Size) when is_port(Port), Size < 16#ffff -> erlang:port_command(Port, Data). --spec event(port(),[integer()],non_neg_integer(),'infinity' | non_neg_integer()) -> reply(). -event(Port, Pids, Type, Timeout) when is_atom(Type) -> - event(Port, Pids, atom_to_type(Type), Timeout); -event(Port, [], Type, Timeout) -> +-spec event(port(),[integer()],non_neg_integer(), + 'infinity' | non_neg_integer()) -> reply(). + +% Check the mailbox for processed events +event(Port, Pids, Type, Timeout) when is_integer(Type) -> + Tag = type_to_atom(Type), + receive + {Port, {Tag, Pids, Data}} -> + Data + after + 0 -> + event_1(Port, Pids, Type, Timeout) + end. + +% Check for messages from the port + +% Reply from the port: no message length +event_1(Port, [], Type, Timeout) -> receive {Port, {data, <>}} -> binary_to_term(Reply) @@ -70,72 +85,192 @@ event(Port, [], Type, Timeout) -> Timeout -> false end; -event(Port, [Pid0], Type, Timeout) -> + +% Reply from a child process. +% +% The parent process may coalesce 2 writes from the child into 1 read. The +% parent could read the length header then read length bytes except that +% the child may have called execvp(). After calling exec(), the data +% returned from the child will not contain a length header. +% +% Work around this by converting the reply into a list of messages. The +% first message matching the requested type is returned to the caller. The +% remaining messages are pushed back into the process' mailbox. +event_1(Port, [Pid0] = Pids, Type, Timeout) -> receive {Port, {data, << ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), ?UINT16(Len), ?UINT16(Type), Reply/binary - >>}} when Len =:= 2 + byte_size(Reply) -> - binary_to_term(Reply) + >>}} when Len =:= 2 + byte_size(Reply) -> + binary_to_term(Reply); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} when Len =:= 2 + byte_size(Reply) -> + events(Port, Pids, Type, + <>), + event_1(Port, Pids, Type, Timeout); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} -> + events(Port, Pids, Type, + <>) after Timeout -> false end; -event(Port, [Pid0, Pid1], Type, Timeout) -> +event_1(Port, [Pid0,Pid1] = Pids, Type, Timeout) -> receive {Port, {data, << ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), - ?UINT16(_Len1), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), ?UINT16(Len), ?UINT16(Type), Reply/binary - >>}} when Len =:= 2 + byte_size(Reply) -> - binary_to_term(Reply) + >>}} when Len =:= 2 + byte_size(Reply) -> + binary_to_term(Reply); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} when Len =:= 2 + byte_size(Reply) -> + events(Port, Pids, Type, + <>), + event_1(Port, Pids, Type, Timeout); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} -> + events(Port, Pids, Type, + <>) after Timeout -> false end; -event(Port, [Pid0, Pid1, Pid2], Type, Timeout) -> +event_1(Port, [Pid0,Pid1,Pid2] = Pids, Type, Timeout) -> receive {Port, {data, << ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), - ?UINT16(_Len1), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), - ?UINT16(_Len2), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), ?UINT16(Len), ?UINT16(Type), Reply/binary - >>}} when Len =:= 2 + byte_size(Reply) -> - binary_to_term(Reply) + >>}} when Len =:= 2 + byte_size(Reply) -> + binary_to_term(Reply); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} when Len =:= 2 + byte_size(Reply) -> + events(Port, Pids, Type, + <>), + event_1(Port, Pids, Type, Timeout); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} -> + events(Port, Pids, Type, + <>) after Timeout -> false end; -event(Port, [Pid0, Pid1, Pid2, Pid3], Type, Timeout) -> +event_1(Port, [Pid0,Pid1,Pid2,Pid3] = Pids, Type, Timeout) -> receive {Port, {data, << ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), - ?UINT16(_Len1), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), - ?UINT16(_Len2), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), - ?UINT16(_Len3), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), ?UINT16(Len), ?UINT16(Type), Reply/binary - >>}} when Len =:= 2 + byte_size(Reply) -> - binary_to_term(Reply) + >>}} when Len =:= 2 + byte_size(Reply) -> + binary_to_term(Reply); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} when Len =:= 2 + byte_size(Reply) -> + events(Port, Pids, Type, + <>), + event_1(Port, Pids, Type, Timeout); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} -> + events(Port, Pids, Type, + <>) after Timeout -> false end; -event(Port, [Pid0, Pid1, Pid2, Pid3, Pid4], Type, Timeout) -> +event_1(Port, [Pid0,Pid1,Pid2,Pid3,Pid4] = Pids, Type, Timeout) -> receive {Port, {data, << ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), - ?UINT16(_Len1), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), - ?UINT16(_Len2), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), - ?UINT16(_Len3), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), - ?UINT16(_Len4), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid4), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid4), ?UINT16(Len), ?UINT16(Type), Reply/binary - >>}} when Len =:= 2 + byte_size(Reply) -> - binary_to_term(Reply) + >>}} when Len =:= 2 + byte_size(Reply) -> + binary_to_term(Reply); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid4), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} when Len =:= 2 + byte_size(Reply) -> + events(Port, Pids, Type, + <>), + event_1(Port, Pids, Type, Timeout); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid4), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} -> + events(Port, Pids, Type, + <>) after Timeout -> false end. +events(Port, Pids, Type, Reply) -> + events(Port, Pids, Type, Reply, []). + +events(Port, _Pids, ReqType, <<>>, Acc0) -> + Tag = type_to_atom(ReqType), + Acc = lists:reverse(Acc0), + Event = lists:keyfind(Tag, 1, Acc), + Events = lists:keydelete(Tag, 1, Acc), + Self = self(), + [ Self ! {Port, E} || E <- Events ], + case Event of + false -> + false; + {Tag, _, Data} -> + Data + end; +events(Port, Pids, ReqType, + <>, Acc) -> + % length includes the message type field + Bytes = Len - 2, + <> = Reply, + events(Port, Pids, ReqType, Rest, + [{type_to_atom(Type), Pids, binary_to_term(Bin)}|Acc]). + -spec stdin(port(),[integer()],iodata()) -> 'true'. stdin(Port, [], Data) -> cast(Port, Data); @@ -358,11 +493,17 @@ find_executable(Exe) -> N end. -atom_to_type(call) -> ?ALCOVE_MSG_CALL; -atom_to_type(event) -> ?ALCOVE_MSG_EVENT; -atom_to_type(stdin) -> ?ALCOVE_MSG_STDIN; -atom_to_type(stdout) -> ?ALCOVE_MSG_STDOUT; -atom_to_type(stderr) -> ?ALCOVE_MSG_STDERR. +atom_to_type(alcove_call) -> ?ALCOVE_MSG_CALL; +atom_to_type(alcove_event) -> ?ALCOVE_MSG_EVENT; +atom_to_type(alcove_stdin) -> ?ALCOVE_MSG_STDIN; +atom_to_type(alcove_stdout) -> ?ALCOVE_MSG_STDOUT; +atom_to_type(alcove_stderr) -> ?ALCOVE_MSG_STDERR. + +type_to_atom(?ALCOVE_MSG_CALL) -> alcove_call; +type_to_atom(?ALCOVE_MSG_EVENT) -> alcove_event; +type_to_atom(?ALCOVE_MSG_STDIN) -> alcove_stdin; +type_to_atom(?ALCOVE_MSG_STDOUT) -> alcove_stdout; +type_to_atom(?ALCOVE_MSG_STDERR) -> alcove_stderr. basedir(Module) -> case code:priv_dir(Module) of diff --git a/test/alcove_tests.erl b/test/alcove_tests.erl index 63e49e5..955dde5 100644 --- a/test/alcove_tests.erl +++ b/test/alcove_tests.erl @@ -29,6 +29,7 @@ alcove_test_() -> run(State) -> % Test order must be maintained [ + msg(State), version(State), pid(State), getpid(State), @@ -44,6 +45,8 @@ run(State) -> setuid(State), fork(State), signal(State), + portstress(State), + forkstress(State), prctl(State), execvp(State), stdout(State), @@ -71,6 +74,33 @@ start() -> stop({_, Port, _Child}) -> alcove_drv:stop(Port). +msg({_, Port, Child}) -> + % Length, Message type, Term + Msg = <<0,16, 0,1, 131,104,2,100,0,6,115,105,103,110,97,108,97,17, + 0,16, 0,1, 131,104,2,100,0,6,115,105,103,110,97,108,97,17, + 0,16, 0,1, 131,104,2,100,0,6,115,105,103,110,97,108,97,17, + 0,13, 0,0, 131,109,0,0,0,5,48,46,50,46,48, + 0,16, 0,1, 131,104,2,100,0,6,115,105,103,110,97,108,97,17>>, + + Reply = alcove_drv:events(Port, [Child], ?ALCOVE_MSG_CALL, Msg), + + Sig1 = alcove_drv:event(Port, [Child], ?ALCOVE_MSG_EVENT, 0), + Sig2 = alcove_drv:event(Port, [Child], ?ALCOVE_MSG_EVENT, 0), + Sig3 = alcove_drv:event(Port, [Child], ?ALCOVE_MSG_EVENT, 0), + Sig4 = alcove_drv:event(Port, [Child], ?ALCOVE_MSG_EVENT, 0), + Sig5 = alcove_drv:event(Port, [Child], ?ALCOVE_MSG_EVENT, 0), + + [ + ?_assertEqual(<<"0.2.0">>, Reply), + + ?_assertEqual({signal,17}, Sig1), + ?_assertEqual({signal,17}, Sig2), + ?_assertEqual({signal,17}, Sig3), + ?_assertEqual({signal,17}, Sig4), + ?_assertEqual(false, Sig5) + + ]. + version({_, Port, _Child}) -> Version = alcove:version(Port), ?_assertEqual(true, is_binary(Version)). @@ -228,6 +258,19 @@ signal({_, Port, _Child}) -> ?_assertEqual({error,esrch}, Search) ]. +portstress({_, Port, Child}) -> + Reply = [ alcove:version(Port, [Child]) || _ <- lists:seq(1,1000) ], + Ok = lists:filter(fun + (false) -> false; + (_) -> true + end, Reply), + ?_assertEqual(Ok, Reply). + +forkstress({_, Port, _Child}) -> + {ok, Fork} = alcove:fork(Port), + Reply = forkstress_1(Port, Fork, 100), + ?_assertEqual(ok, Reply). + prctl({linux, Port, _Child}) -> {ok, Fork} = alcove:fork(Port), @@ -315,3 +358,15 @@ flush(stdout, Port, Pids) -> _ -> flush(stdout, Port, Pids) end. + +forkstress_1(_Port, _Child, 0) -> + ok; +forkstress_1(Port, Child, N) -> + {ok, Fork} = alcove:fork(Port, [Child]), + ok = alcove:kill(Port, [Child], Fork, 15), + case alcove:version(Port, [Child]) of + {error, timedout} -> + fail; + _ -> + forkstress_1(Port, Child, N-1) + end.