Handle coalesced messages from the port

The port returns a message prefaced with a 2 byte length header. When
the port is reading the stdin of the child process, it ignores the
length header because the child process may or may not have called
execvp(), i.e., the port treats the child processes as streams.

The child process may reply in 2 ways: as the response to a call
(initiated from the erlang side) or it may originate an event (for
example, trapping a signal). If these events occur at about the same
time, the child will issue 2 writes but the parent may read the data in
1 read.

(There may be multiple events returned or multiple events and a call but
 there shouldn't be multiple calls returned in a message because calls
 are synchronous).

Fix this by parsing the payload into terms. The first message matching
the type requested by the caller is returned. The other messages are
pushed back into the caller's mailbox.

Problems:

* the timeout is not absolute. Every time a message is received, it is
  parsed and matched against the message type. If the message is not a
  match, the process will block in receive with timeout.

* probably better handled by using an intermediary process. For example,
  having a gen_server control the port.
This commit is contained in:
Michael Santos
2014-03-18 09:11:45 -04:00
parent 059aa5c98d
commit d7ede5f736
2 changed files with 231 additions and 35 deletions

View File

@@ -18,7 +18,8 @@
-export([start/0, start/1, stop/1]).
-export([call/2, call/3, call/4, cast/2, encode/2, encode/3]).
-export([stdin/3, stdout/3, stderr/3, event/4]).
-export([msg/2]).
-export([atom_to_type/1, type_to_atom/1]).
-export([msg/2, events/4]).
-export([getopts/1]).
-export_type([reply/0]).
@@ -59,10 +60,24 @@ cast(Port, Data) ->
send(Port, Data, Size) when is_port(Port), Size < 16#ffff ->
erlang:port_command(Port, Data).
-spec event(port(),[integer()],non_neg_integer(),'infinity' | non_neg_integer()) -> reply().
event(Port, Pids, Type, Timeout) when is_atom(Type) ->
event(Port, Pids, atom_to_type(Type), Timeout);
event(Port, [], Type, Timeout) ->
-spec event(port(),[integer()],non_neg_integer(),
'infinity' | non_neg_integer()) -> reply().
% Check the mailbox for processed events
event(Port, Pids, Type, Timeout) when is_integer(Type) ->
Tag = type_to_atom(Type),
receive
{Port, {Tag, Pids, Data}} ->
Data
after
0 ->
event_1(Port, Pids, Type, Timeout)
end.
% Check for messages from the port
% Reply from the port: no message length
event_1(Port, [], Type, Timeout) ->
receive
{Port, {data, <<?UINT16(Type), Reply/binary>>}} ->
binary_to_term(Reply)
@@ -70,72 +85,192 @@ event(Port, [], Type, Timeout) ->
Timeout ->
false
end;
event(Port, [Pid0], Type, Timeout) ->
% Reply from a child process.
%
% The parent process may coalesce 2 writes from the child into 1 read. The
% parent could read the length header then read length bytes except that
% the child may have called execvp(). After calling exec(), the data
% returned from the child will not contain a length header.
%
% Work around this by converting the reply into a list of messages. The
% first message matching the requested type is returned to the caller. The
% remaining messages are pushed back into the process' mailbox.
event_1(Port, [Pid0] = Pids, Type, Timeout) ->
receive
{Port, {data, <<
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0),
?UINT16(Len), ?UINT16(Type), Reply/binary
>>}} when Len =:= 2 + byte_size(Reply) ->
binary_to_term(Reply)
>>}} when Len =:= 2 + byte_size(Reply) ->
binary_to_term(Reply);
{Port, {data, <<
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0),
?UINT16(Len), ?UINT16(Type1), Reply/binary
>>}} when Len =:= 2 + byte_size(Reply) ->
events(Port, Pids, Type,
<<?UINT16(Len), ?UINT16(Type1), Reply/binary>>),
event_1(Port, Pids, Type, Timeout);
{Port, {data, <<
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0),
?UINT16(Len), ?UINT16(Type1), Reply/binary
>>}} ->
events(Port, Pids, Type,
<<?UINT16(Len), ?UINT16(Type1), Reply/binary>>)
after
Timeout ->
false
end;
event(Port, [Pid0, Pid1], Type, Timeout) ->
event_1(Port, [Pid0,Pid1] = Pids, Type, Timeout) ->
receive
{Port, {data, <<
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0),
?UINT16(_Len1), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(Len), ?UINT16(Type), Reply/binary
>>}} when Len =:= 2 + byte_size(Reply) ->
binary_to_term(Reply)
>>}} when Len =:= 2 + byte_size(Reply) ->
binary_to_term(Reply);
{Port, {data, <<
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(Len), ?UINT16(Type1), Reply/binary
>>}} when Len =:= 2 + byte_size(Reply) ->
events(Port, Pids, Type,
<<?UINT16(Len), ?UINT16(Type1), Reply/binary>>),
event_1(Port, Pids, Type, Timeout);
{Port, {data, <<
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(Len), ?UINT16(Type1), Reply/binary
>>}} ->
events(Port, Pids, Type,
<<?UINT16(Len), ?UINT16(Type1), Reply/binary>>)
after
Timeout ->
false
end;
event(Port, [Pid0, Pid1, Pid2], Type, Timeout) ->
event_1(Port, [Pid0,Pid1,Pid2] = Pids, Type, Timeout) ->
receive
{Port, {data, <<
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0),
?UINT16(_Len1), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(_Len2), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2),
?UINT16(Len), ?UINT16(Type), Reply/binary
>>}} when Len =:= 2 + byte_size(Reply) ->
binary_to_term(Reply)
>>}} when Len =:= 2 + byte_size(Reply) ->
binary_to_term(Reply);
{Port, {data, <<
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2),
?UINT16(Len), ?UINT16(Type1), Reply/binary
>>}} when Len =:= 2 + byte_size(Reply) ->
events(Port, Pids, Type,
<<?UINT16(Len), ?UINT16(Type1), Reply/binary>>),
event_1(Port, Pids, Type, Timeout);
{Port, {data, <<
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2),
?UINT16(Len), ?UINT16(Type1), Reply/binary
>>}} ->
events(Port, Pids, Type,
<<?UINT16(Len), ?UINT16(Type1), Reply/binary>>)
after
Timeout ->
false
end;
event(Port, [Pid0, Pid1, Pid2, Pid3], Type, Timeout) ->
event_1(Port, [Pid0,Pid1,Pid2,Pid3] = Pids, Type, Timeout) ->
receive
{Port, {data, <<
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0),
?UINT16(_Len1), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(_Len2), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2),
?UINT16(_Len3), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3),
?UINT16(Len), ?UINT16(Type), Reply/binary
>>}} when Len =:= 2 + byte_size(Reply) ->
binary_to_term(Reply)
>>}} when Len =:= 2 + byte_size(Reply) ->
binary_to_term(Reply);
{Port, {data, <<
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3),
?UINT16(Len), ?UINT16(Type1), Reply/binary
>>}} when Len =:= 2 + byte_size(Reply) ->
events(Port, Pids, Type,
<<?UINT16(Len), ?UINT16(Type1), Reply/binary>>),
event_1(Port, Pids, Type, Timeout);
{Port, {data, <<
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3),
?UINT16(Len), ?UINT16(Type1), Reply/binary
>>}} ->
events(Port, Pids, Type,
<<?UINT16(Len), ?UINT16(Type1), Reply/binary>>)
after
Timeout ->
false
end;
event(Port, [Pid0, Pid1, Pid2, Pid3, Pid4], Type, Timeout) ->
event_1(Port, [Pid0,Pid1,Pid2,Pid3,Pid4] = Pids, Type, Timeout) ->
receive
{Port, {data, <<
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0),
?UINT16(_Len1), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(_Len2), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2),
?UINT16(_Len3), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3),
?UINT16(_Len4), ?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid4),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid4),
?UINT16(Len), ?UINT16(Type), Reply/binary
>>}} when Len =:= 2 + byte_size(Reply) ->
binary_to_term(Reply)
>>}} when Len =:= 2 + byte_size(Reply) ->
binary_to_term(Reply);
{Port, {data, <<
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid4),
?UINT16(Len), ?UINT16(Type1), Reply/binary
>>}} when Len =:= 2 + byte_size(Reply) ->
events(Port, Pids, Type,
<<?UINT16(Len), ?UINT16(Type1), Reply/binary>>),
event_1(Port, Pids, Type, Timeout);
{Port, {data, <<
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid0),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid1),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid2),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid3),
?UINT16(?ALCOVE_MSG_STDOUT), ?UINT32(Pid4),
?UINT16(Len), ?UINT16(Type1), Reply/binary
>>}} ->
events(Port, Pids, Type,
<<?UINT16(Len), ?UINT16(Type1), Reply/binary>>)
after
Timeout ->
false
end.
events(Port, Pids, Type, Reply) ->
events(Port, Pids, Type, Reply, []).
events(Port, _Pids, ReqType, <<>>, Acc0) ->
Tag = type_to_atom(ReqType),
Acc = lists:reverse(Acc0),
Event = lists:keyfind(Tag, 1, Acc),
Events = lists:keydelete(Tag, 1, Acc),
Self = self(),
[ Self ! {Port, E} || E <- Events ],
case Event of
false ->
false;
{Tag, _, Data} ->
Data
end;
events(Port, Pids, ReqType,
<<?UINT16(Len), ?UINT16(Type), Reply/binary>>, Acc) ->
% length includes the message type field
Bytes = Len - 2,
<<Bin:Bytes/binary, Rest/binary>> = Reply,
events(Port, Pids, ReqType, Rest,
[{type_to_atom(Type), Pids, binary_to_term(Bin)}|Acc]).
-spec stdin(port(),[integer()],iodata()) -> 'true'.
stdin(Port, [], Data) ->
cast(Port, Data);
@@ -358,11 +493,17 @@ find_executable(Exe) ->
N
end.
atom_to_type(call) -> ?ALCOVE_MSG_CALL;
atom_to_type(event) -> ?ALCOVE_MSG_EVENT;
atom_to_type(stdin) -> ?ALCOVE_MSG_STDIN;
atom_to_type(stdout) -> ?ALCOVE_MSG_STDOUT;
atom_to_type(stderr) -> ?ALCOVE_MSG_STDERR.
atom_to_type(alcove_call) -> ?ALCOVE_MSG_CALL;
atom_to_type(alcove_event) -> ?ALCOVE_MSG_EVENT;
atom_to_type(alcove_stdin) -> ?ALCOVE_MSG_STDIN;
atom_to_type(alcove_stdout) -> ?ALCOVE_MSG_STDOUT;
atom_to_type(alcove_stderr) -> ?ALCOVE_MSG_STDERR.
type_to_atom(?ALCOVE_MSG_CALL) -> alcove_call;
type_to_atom(?ALCOVE_MSG_EVENT) -> alcove_event;
type_to_atom(?ALCOVE_MSG_STDIN) -> alcove_stdin;
type_to_atom(?ALCOVE_MSG_STDOUT) -> alcove_stdout;
type_to_atom(?ALCOVE_MSG_STDERR) -> alcove_stderr.
basedir(Module) ->
case code:priv_dir(Module) of

View File

@@ -29,6 +29,7 @@ alcove_test_() ->
run(State) ->
% Test order must be maintained
[
msg(State),
version(State),
pid(State),
getpid(State),
@@ -44,6 +45,8 @@ run(State) ->
setuid(State),
fork(State),
signal(State),
portstress(State),
forkstress(State),
prctl(State),
execvp(State),
stdout(State),
@@ -71,6 +74,33 @@ start() ->
stop({_, Port, _Child}) ->
alcove_drv:stop(Port).
msg({_, Port, Child}) ->
% Length, Message type, Term
Msg = <<0,16, 0,1, 131,104,2,100,0,6,115,105,103,110,97,108,97,17,
0,16, 0,1, 131,104,2,100,0,6,115,105,103,110,97,108,97,17,
0,16, 0,1, 131,104,2,100,0,6,115,105,103,110,97,108,97,17,
0,13, 0,0, 131,109,0,0,0,5,48,46,50,46,48,
0,16, 0,1, 131,104,2,100,0,6,115,105,103,110,97,108,97,17>>,
Reply = alcove_drv:events(Port, [Child], ?ALCOVE_MSG_CALL, Msg),
Sig1 = alcove_drv:event(Port, [Child], ?ALCOVE_MSG_EVENT, 0),
Sig2 = alcove_drv:event(Port, [Child], ?ALCOVE_MSG_EVENT, 0),
Sig3 = alcove_drv:event(Port, [Child], ?ALCOVE_MSG_EVENT, 0),
Sig4 = alcove_drv:event(Port, [Child], ?ALCOVE_MSG_EVENT, 0),
Sig5 = alcove_drv:event(Port, [Child], ?ALCOVE_MSG_EVENT, 0),
[
?_assertEqual(<<"0.2.0">>, Reply),
?_assertEqual({signal,17}, Sig1),
?_assertEqual({signal,17}, Sig2),
?_assertEqual({signal,17}, Sig3),
?_assertEqual({signal,17}, Sig4),
?_assertEqual(false, Sig5)
].
version({_, Port, _Child}) ->
Version = alcove:version(Port),
?_assertEqual(true, is_binary(Version)).
@@ -228,6 +258,19 @@ signal({_, Port, _Child}) ->
?_assertEqual({error,esrch}, Search)
].
portstress({_, Port, Child}) ->
Reply = [ alcove:version(Port, [Child]) || _ <- lists:seq(1,1000) ],
Ok = lists:filter(fun
(false) -> false;
(_) -> true
end, Reply),
?_assertEqual(Ok, Reply).
forkstress({_, Port, _Child}) ->
{ok, Fork} = alcove:fork(Port),
Reply = forkstress_1(Port, Fork, 100),
?_assertEqual(ok, Reply).
prctl({linux, Port, _Child}) ->
{ok, Fork} = alcove:fork(Port),
@@ -315,3 +358,15 @@ flush(stdout, Port, Pids) ->
_ ->
flush(stdout, Port, Pids)
end.
forkstress_1(_Port, _Child, 0) ->
ok;
forkstress_1(Port, Child, N) ->
{ok, Fork} = alcove:fork(Port, [Child]),
ok = alcove:kill(Port, [Child], Fork, 15),
case alcove:version(Port, [Child]) of
{error, timedout} ->
fail;
_ ->
forkstress_1(Port, Child, N-1)
end.