From d7ede5f736e4165654186b8ae37e248026deed18 Mon Sep 17 00:00:00 2001 From: Michael Santos Date: Tue, 18 Mar 2014 09:11:45 -0400 Subject: [PATCH] Handle coalesced messages from the port The port returns a message prefaced with a 2 byte length header. When the port is reading the stdin of the child process, it ignores the length header because the child process may or may not have called execvp(), i.e., the port treats the child processes as streams. The child process may reply in 2 ways: as the response to a call (initiated from the erlang side) or it may originate an event (for example, trapping a signal). If these events occur at about the same time, the child will issue 2 writes but the parent may read the data in 1 read. (There may be multiple events returned or multiple events and a call but there shouldn't be multiple calls returned in a message because calls are synchronous). Fix this by parsing the payload into terms. The first message matching the type requested by the caller is returned. The other messages are pushed back into the caller's mailbox. Problems: * the timeout is not absolute. Every time a message is received, it is parsed and matched against the message type. If the message is not a match, the process will block in receive with timeout. * probably better handled by using an intermediary process. For example, having a gen_server control the port. --- src/alcove_drv.erl | 211 +++++++++++++++++++++++++++++++++++------- test/alcove_tests.erl | 55 +++++++++++ 2 files changed, 231 insertions(+), 35 deletions(-) diff --git a/src/alcove_drv.erl b/src/alcove_drv.erl index d5062ba..c17a754 100644 --- a/src/alcove_drv.erl +++ b/src/alcove_drv.erl @@ -18,7 +18,8 @@ -export([start/0, start/1, stop/1]). -export([call/2, call/3, call/4, cast/2, encode/2, encode/3]). -export([stdin/3, stdout/3, stderr/3, event/4]). --export([msg/2]). +-export([atom_to_type/1, type_to_atom/1]). +-export([msg/2, events/4]). -export([getopts/1]). -export_type([reply/0]). @@ -59,10 +60,24 @@ cast(Port, Data) -> send(Port, Data, Size) when is_port(Port), Size < 16#ffff -> erlang:port_command(Port, Data). --spec event(port(),[integer()],non_neg_integer(),'infinity' | non_neg_integer()) -> reply(). -event(Port, Pids, Type, Timeout) when is_atom(Type) -> - event(Port, Pids, atom_to_type(Type), Timeout); -event(Port, [], Type, Timeout) -> +-spec event(port(),[integer()],non_neg_integer(), + 'infinity' | non_neg_integer()) -> reply(). + +% Check the mailbox for processed events +event(Port, Pids, Type, Timeout) when is_integer(Type) -> + Tag = type_to_atom(Type), + receive + {Port, {Tag, Pids, Data}} -> + Data + after + 0 -> + event_1(Port, Pids, Type, Timeout) + end. + +% Check for messages from the port + +% Reply from the port: no message length +event_1(Port, [], Type, Timeout) -> receive {Port, {data, <>}} -> binary_to_term(Reply) @@ -70,72 +85,192 @@ event(Port, [], Type, Timeout) -> Timeout -> false end; -event(Port, [Pid0], Type, Timeout) -> + +% Reply from a child process. +% +% The parent process may coalesce 2 writes from the child into 1 read. The +% parent could read the length header then read length bytes except that +% the child may have called execvp(). After calling exec(), the data +% returned from the child will not contain a length header. +% +% Work around this by converting the reply into a list of messages. The +% first message matching the requested type is returned to the caller. The +% remaining messages are pushed back into the process' mailbox. +event_1(Port, [Pid0] = Pids, Type, Timeout) -> receive {Port, {data, << ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), ?UINT16(Len), ?UINT16(Type), Reply/binary - >>}} when Len =:= 2 + byte_size(Reply) -> - binary_to_term(Reply) + >>}} when Len =:= 2 + byte_size(Reply) -> + binary_to_term(Reply); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} when Len =:= 2 + byte_size(Reply) -> + events(Port, Pids, Type, + <>), + event_1(Port, Pids, Type, Timeout); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} -> + events(Port, Pids, Type, + <>) after Timeout -> false end; -event(Port, [Pid0, Pid1], Type, Timeout) -> +event_1(Port, [Pid0,Pid1] = Pids, Type, Timeout) -> receive {Port, {data, << ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), - ?UINT16(_Len1), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), ?UINT16(Len), ?UINT16(Type), Reply/binary - >>}} when Len =:= 2 + byte_size(Reply) -> - binary_to_term(Reply) + >>}} when Len =:= 2 + byte_size(Reply) -> + binary_to_term(Reply); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} when Len =:= 2 + byte_size(Reply) -> + events(Port, Pids, Type, + <>), + event_1(Port, Pids, Type, Timeout); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} -> + events(Port, Pids, Type, + <>) after Timeout -> false end; -event(Port, [Pid0, Pid1, Pid2], Type, Timeout) -> +event_1(Port, [Pid0,Pid1,Pid2] = Pids, Type, Timeout) -> receive {Port, {data, << ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), - ?UINT16(_Len1), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), - ?UINT16(_Len2), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), ?UINT16(Len), ?UINT16(Type), Reply/binary - >>}} when Len =:= 2 + byte_size(Reply) -> - binary_to_term(Reply) + >>}} when Len =:= 2 + byte_size(Reply) -> + binary_to_term(Reply); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} when Len =:= 2 + byte_size(Reply) -> + events(Port, Pids, Type, + <>), + event_1(Port, Pids, Type, Timeout); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} -> + events(Port, Pids, Type, + <>) after Timeout -> false end; -event(Port, [Pid0, Pid1, Pid2, Pid3], Type, Timeout) -> +event_1(Port, [Pid0,Pid1,Pid2,Pid3] = Pids, Type, Timeout) -> receive {Port, {data, << ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), - ?UINT16(_Len1), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), - ?UINT16(_Len2), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), - ?UINT16(_Len3), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), ?UINT16(Len), ?UINT16(Type), Reply/binary - >>}} when Len =:= 2 + byte_size(Reply) -> - binary_to_term(Reply) + >>}} when Len =:= 2 + byte_size(Reply) -> + binary_to_term(Reply); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} when Len =:= 2 + byte_size(Reply) -> + events(Port, Pids, Type, + <>), + event_1(Port, Pids, Type, Timeout); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} -> + events(Port, Pids, Type, + <>) after Timeout -> false end; -event(Port, [Pid0, Pid1, Pid2, Pid3, Pid4], Type, Timeout) -> +event_1(Port, [Pid0,Pid1,Pid2,Pid3,Pid4] = Pids, Type, Timeout) -> receive {Port, {data, << ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), - ?UINT16(_Len1), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), - ?UINT16(_Len2), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), - ?UINT16(_Len3), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), - ?UINT16(_Len4), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid4), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid4), ?UINT16(Len), ?UINT16(Type), Reply/binary - >>}} when Len =:= 2 + byte_size(Reply) -> - binary_to_term(Reply) + >>}} when Len =:= 2 + byte_size(Reply) -> + binary_to_term(Reply); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid4), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} when Len =:= 2 + byte_size(Reply) -> + events(Port, Pids, Type, + <>), + event_1(Port, Pids, Type, Timeout); + {Port, {data, << + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3), + ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid4), + ?UINT16(Len), ?UINT16(Type1), Reply/binary + >>}} -> + events(Port, Pids, Type, + <>) after Timeout -> false end. +events(Port, Pids, Type, Reply) -> + events(Port, Pids, Type, Reply, []). + +events(Port, _Pids, ReqType, <<>>, Acc0) -> + Tag = type_to_atom(ReqType), + Acc = lists:reverse(Acc0), + Event = lists:keyfind(Tag, 1, Acc), + Events = lists:keydelete(Tag, 1, Acc), + Self = self(), + [ Self ! {Port, E} || E <- Events ], + case Event of + false -> + false; + {Tag, _, Data} -> + Data + end; +events(Port, Pids, ReqType, + <>, Acc) -> + % length includes the message type field + Bytes = Len - 2, + <> = Reply, + events(Port, Pids, ReqType, Rest, + [{type_to_atom(Type), Pids, binary_to_term(Bin)}|Acc]). + -spec stdin(port(),[integer()],iodata()) -> 'true'. stdin(Port, [], Data) -> cast(Port, Data); @@ -358,11 +493,17 @@ find_executable(Exe) -> N end. -atom_to_type(call) -> ?ALCOVE_MSG_CALL; -atom_to_type(event) -> ?ALCOVE_MSG_EVENT; -atom_to_type(stdin) -> ?ALCOVE_MSG_STDIN; -atom_to_type(stdout) -> ?ALCOVE_MSG_STDOUT; -atom_to_type(stderr) -> ?ALCOVE_MSG_STDERR. +atom_to_type(alcove_call) -> ?ALCOVE_MSG_CALL; +atom_to_type(alcove_event) -> ?ALCOVE_MSG_EVENT; +atom_to_type(alcove_stdin) -> ?ALCOVE_MSG_STDIN; +atom_to_type(alcove_stdout) -> ?ALCOVE_MSG_STDOUT; +atom_to_type(alcove_stderr) -> ?ALCOVE_MSG_STDERR. + +type_to_atom(?ALCOVE_MSG_CALL) -> alcove_call; +type_to_atom(?ALCOVE_MSG_EVENT) -> alcove_event; +type_to_atom(?ALCOVE_MSG_STDIN) -> alcove_stdin; +type_to_atom(?ALCOVE_MSG_STDOUT) -> alcove_stdout; +type_to_atom(?ALCOVE_MSG_STDERR) -> alcove_stderr. basedir(Module) -> case code:priv_dir(Module) of diff --git a/test/alcove_tests.erl b/test/alcove_tests.erl index 63e49e5..955dde5 100644 --- a/test/alcove_tests.erl +++ b/test/alcove_tests.erl @@ -29,6 +29,7 @@ alcove_test_() -> run(State) -> % Test order must be maintained [ + msg(State), version(State), pid(State), getpid(State), @@ -44,6 +45,8 @@ run(State) -> setuid(State), fork(State), signal(State), + portstress(State), + forkstress(State), prctl(State), execvp(State), stdout(State), @@ -71,6 +74,33 @@ start() -> stop({_, Port, _Child}) -> alcove_drv:stop(Port). +msg({_, Port, Child}) -> + % Length, Message type, Term + Msg = <<0,16, 0,1, 131,104,2,100,0,6,115,105,103,110,97,108,97,17, + 0,16, 0,1, 131,104,2,100,0,6,115,105,103,110,97,108,97,17, + 0,16, 0,1, 131,104,2,100,0,6,115,105,103,110,97,108,97,17, + 0,13, 0,0, 131,109,0,0,0,5,48,46,50,46,48, + 0,16, 0,1, 131,104,2,100,0,6,115,105,103,110,97,108,97,17>>, + + Reply = alcove_drv:events(Port, [Child], ?ALCOVE_MSG_CALL, Msg), + + Sig1 = alcove_drv:event(Port, [Child], ?ALCOVE_MSG_EVENT, 0), + Sig2 = alcove_drv:event(Port, [Child], ?ALCOVE_MSG_EVENT, 0), + Sig3 = alcove_drv:event(Port, [Child], ?ALCOVE_MSG_EVENT, 0), + Sig4 = alcove_drv:event(Port, [Child], ?ALCOVE_MSG_EVENT, 0), + Sig5 = alcove_drv:event(Port, [Child], ?ALCOVE_MSG_EVENT, 0), + + [ + ?_assertEqual(<<"0.2.0">>, Reply), + + ?_assertEqual({signal,17}, Sig1), + ?_assertEqual({signal,17}, Sig2), + ?_assertEqual({signal,17}, Sig3), + ?_assertEqual({signal,17}, Sig4), + ?_assertEqual(false, Sig5) + + ]. + version({_, Port, _Child}) -> Version = alcove:version(Port), ?_assertEqual(true, is_binary(Version)). @@ -228,6 +258,19 @@ signal({_, Port, _Child}) -> ?_assertEqual({error,esrch}, Search) ]. +portstress({_, Port, Child}) -> + Reply = [ alcove:version(Port, [Child]) || _ <- lists:seq(1,1000) ], + Ok = lists:filter(fun + (false) -> false; + (_) -> true + end, Reply), + ?_assertEqual(Ok, Reply). + +forkstress({_, Port, _Child}) -> + {ok, Fork} = alcove:fork(Port), + Reply = forkstress_1(Port, Fork, 100), + ?_assertEqual(ok, Reply). + prctl({linux, Port, _Child}) -> {ok, Fork} = alcove:fork(Port), @@ -315,3 +358,15 @@ flush(stdout, Port, Pids) -> _ -> flush(stdout, Port, Pids) end. + +forkstress_1(_Port, _Child, 0) -> + ok; +forkstress_1(Port, Child, N) -> + {ok, Fork} = alcove:fork(Port, [Child]), + ok = alcove:kill(Port, [Child], Fork, 15), + case alcove:version(Port, [Child]) of + {error, timedout} -> + fail; + _ -> + forkstress_1(Port, Child, N-1) + end.