diff --git a/src/pocus_sequential.erl b/src/pocus_sequential.erl index fbfbf5b..708331a 100644 --- a/src/pocus_sequential.erl +++ b/src/pocus_sequential.erl @@ -22,8 +22,8 @@ %%% | | / \ %%% | push |---[data]------+ +-[hash]-+ | %%% |______| | | | | | -%%% | | | | | -%%% ______ | _V___________V_ _|_____ | +%%% init | | | | | +%%% ______/ | _V___________V_ _|_____ | %%% | | | | | | | | %%% | seed |---[hash]--->| hash_function |--->| state | | %%% |______| | |_______________| |_______| | @@ -69,12 +69,14 @@ %%% | | / \ %%% | push |---[data]------+ | %%% |______| | | | -%%% | | | -%%% ______ | _V_____________ | +%%% init | | | +%%% ______/ | _V_____________ | %%% | | | | | | %%% | seed |---[hash]--->| hash_function |<--+ | %%% |______| | |_______________| | | %%% | | | | +%%% | [hash] [hash] | +%%% | | | | %%% ______ | _V_____ | ________ | %%% | | | | | | | | | %%% | pack |------------>| state |---[hash]--+--->| buffer | | @@ -133,12 +135,14 @@ %%% | | / \ %%% | push |---[data]------+ | %%% |______| | | | -%%% | | | -%%% ______ | _V_____________ | +%%% init | | | +%%% ______/ | _V_____________ | %%% | | | | | | %%% | seed |---[hash]--->| hash_function |<-----+ | %%% |______| | |_______________| | | %%% | | | | +%%% | [hash] [thash] | +%%% | | | | %%% | _V_____ _|________ | %%% | | | | | | %%% | | state |---[hash]-->| truncate | | @@ -169,12 +173,13 @@ %%% %%% ``` %%% ___________________________ process __ -%%% / \ -%%% ______ | _______________ | +%%% init / \ +%%% ______/ | _______________ | %%% | | | | | | %%% | seed |---[hash]--->| hash_function |<-----+ | %%% |______| | |_______________| | | %%% | | | | +%%% | [hash] | | %%% | | | | %%% ______ | _V_____ | | %%% | | | | | | | @@ -194,6 +199,32 @@ %%% %%% This module supports all hashes supported by `crypto' module. %%% +%%% == ETS Design (draft) == +%%% +%%% +%%% ``` +%%% ______ _____________________ process __ +%%% | | / \ +%%% | push |---[data]------+ | +%%% |______| | | | +%%% init | | | +%%% ______/ | _V_____________ | +%%% | | | | | | +%%% | seed |---[hash]--->| hash_function | | +%%% |______| | |_______________| | +%%% | | | +%%% | [pid,ts,hash] | +%%% | | | +%%% | _V_____ | +%%% | | | | +%%% | | state | | +%%% | |_______| | +%%% | | +%%% \________________________________/ +%%% +%%% +%%% ''' +%%% %%% @end %%% %%% @todo creates types and do DRY on specification. @@ -247,6 +278,7 @@ -export([handle_cast/2, handle_call/3, handle_info/2]). -include_lib("kernel/include/logger.hrl"). -define(TIMEOUT, 10_000). + -record(?MODULE, { args = [] :: proplists:proplists() , hash = sha256 :: atom() , hash_size = 256 :: pos_integer() @@ -259,6 +291,17 @@ , seed = undefined :: binary() , hash_trunc = undefined :: pos_integer() , trunc_endian = big :: big | little + % access part + , mode = public :: public | private + , master_pid = undefined :: undefined | pid() + , master_node = undefined :: undefined | atom() + , start_time = undefined :: undefined | integer() + }). + +-record(message, { node :: atom() + , pid :: pid() + , time :: integer() + , payload :: term() }). %%-------------------------------------------------------------------- @@ -269,7 +312,8 @@ -spec start_link() -> {ok, pid()}. start_link() -> - gen_server:start_link(?MODULE, [], []). + Message = message({start, []}), + gen_server:start_link(?MODULE, Message, []). %%-------------------------------------------------------------------- %% @doc start a new process with custom parameters. @@ -285,7 +329,8 @@ start_link() -> Return :: {ok, pid()}. start_link(Args) -> - gen_server:start_link(?MODULE, Args, []). + Message = message({start, Args}), + gen_server:start_link(?MODULE, Message, []). %%-------------------------------------------------------------------- %% @doc start a new process with custom parameters and custom @@ -302,7 +347,8 @@ start_link(Args) -> Return :: {ok, pid()}. start_link(Args, Opts) -> - gen_server:start_link(?MODULE, Args, Opts). + Message = message({start, Args}), + gen_server:start_link(?MODULE, Message, Opts). %%-------------------------------------------------------------------- %% @doc API: returns process information. @@ -326,7 +372,8 @@ info(Pid) -> Return :: {ok, proplists:proplists()}. info(Pid, Timeout) -> - gen_server:call(Pid, info, Timeout). + Message = message(info), + gen_server:call(Pid, Message, Timeout). %%-------------------------------------------------------------------- %% @doc API: Resets process state based on passed parameters during @@ -336,7 +383,8 @@ info(Pid, Timeout) -> -spec reset(pid() | atom()) -> ok. reset(Pid) -> - gen_server:cast(Pid, reset). + Message = message(reset), + gen_server:cast(Pid, Message). %%-------------------------------------------------------------------- %% @doc API: Uses current hash state output as new data. @@ -348,7 +396,8 @@ reset(Pid) -> Return :: ok. roll(Pid) -> - gen_server:cast(Pid, roll). + Message = message(roll), + gen_server:cast(Pid, Message). %%-------------------------------------------------------------------- %% @doc API: Uses curent hash state output as new data N times. @@ -361,7 +410,8 @@ roll(Pid) -> roll(Pid, Loop) when is_integer(Loop), Loop > 0 -> - gen_server:cast(Pid, {roll, Loop}). + Message = message({roll, Loop}), + gen_server:cast(Pid, Message). %%-------------------------------------------------------------------- %% @doc API: Returns packaged hash. @@ -385,7 +435,8 @@ package(Pid) -> Return :: {ok, binary()} | timeout. package(Pid, Timeout) -> - gen_server:call(Pid, package, Timeout). + Message = message(package), + gen_server:call(Pid, Message, Timeout). %%-------------------------------------------------------------------- %% @doc API: Adds current hash state output into buffer (package). @@ -396,7 +447,8 @@ package(Pid, Timeout) -> Return :: ok. pack(Pid) -> - gen_server:cast(Pid, pack). + Message = message(pack), + gen_server:cast(Pid, Message). %%-------------------------------------------------------------------- %% @doc API: Pulls the current hash state output. @@ -419,7 +471,8 @@ pull(Pid) -> Return :: {ok, binary()} | timeout. pull(Pid, Timeout) -> - gen_server:call(Pid, pull, Timeout). + Message = message(pull), + gen_server:call(Pid, Message, Timeout). %%-------------------------------------------------------------------- %% @doc API: Pushes new values into hash state. @@ -431,7 +484,8 @@ pull(Pid, Timeout) -> Return :: ok. push(Pid, Data) -> - gen_server:cast(Pid, {push, Data}). + Message = message({push, Data}), + gen_server:cast(Pid, Message). %%-------------------------------------------------------------------- %% @doc API: Pushes first and then pack. @@ -445,7 +499,8 @@ push(Pid, Data) -> Return :: ok. push_and_pack(Pid, Data) -> - gen_server:cast(Pid, {push_and_pack, Data}). + Message = message({push_and_pack, Data}), + gen_server:cast(Pid, Message). %%-------------------------------------------------------------------- %% @doc API: Packing first then pushes. @@ -459,7 +514,8 @@ push_and_pack(Pid, Data) -> Return :: ok. pack_and_push(Pid, Data) -> - gen_server:cast(Pid, {pack_and_push, Data}). + Message = message({pack_and_push, Data}), + gen_server:cast(Pid, Message). %%-------------------------------------------------------------------- %% @doc API: Pushes first and then pull @@ -486,7 +542,8 @@ push_and_pull(Pid, Data) -> Return :: {ok, binary()} | timeout. push_and_pull(Pid, Data, Timeout) -> - gen_server:call(Pid, {push_and_pull, Data}, Timeout). + Message = message({ push_and_pull, Data}), + gen_server:call(Pid, Message, Timeout). %%-------------------------------------------------------------------- %% @doc API: Pulling first and then pushing @@ -513,7 +570,8 @@ pull_and_push(Pid, Data) -> Return :: {ok, binary()} | timeout. pull_and_push(Pid, Data, Timeout) -> - gen_server:call(Pid, {pull_and_push, Data}, Timeout). + Message = message({pull_and_push, Data}), + gen_server:call(Pid, Message, Timeout). %%-------------------------------------------------------------------- %% @doc API: Pushing a drip (binary/bitstring) values using standard @@ -522,17 +580,39 @@ pull_and_push(Pid, Data, Timeout) -> %%-------------------------------------------------------------------- drip(Pid, Drip) when is_binary(Drip); is_bitstring(Drip) -> - Pid ! {drip, Drip}. + Message = message({drip, Drip}), + Pid ! Message. + +%%-------------------------------------------------------------------- +%% @hidden +%% @doc +%% @end +%%-------------------------------------------------------------------- +message(Payload) -> + #message{ node = node() + , pid = self() + , time = erlang:monotonic_time() + , payload = Payload + }. %%-------------------------------------------------------------------- %% @hidden %% @doc gen_server callback. Initializes FSM with arguments. %% @end %%-------------------------------------------------------------------- -init(Args) -> +init(#message{ payload = {start, Args} + , node = Node + , pid = Pid + , time = Time + }) -> + Mode = proplists:get_value(mode, Args, public), PackLimit = proplists:get_value(pack_limit, Args, undefined), - InitState = #?MODULE{ args = Args - , pack_limit = PackLimit + InitState = #?MODULE{ args = Args + , pack_limit = PackLimit + , mode = Mode + , master_pid = Pid + , master_node = Node + , start_time = Time }, init_hash(Args, InitState). @@ -625,7 +705,7 @@ init_trunc(Args, State = #?MODULE{ hash_size = HashSize }) -> case Trunc of undefined -> init_final(Args, State); - _ when is_integer(Trunc), Trunc > 0, Trunc =< Trunc -> + _ when is_integer(Trunc), Trunc > 0, Trunc =< HashSize -> NewState = State#?MODULE{ hash_trunc = Trunc }, init_final(Args, NewState); _ -> @@ -645,13 +725,28 @@ init_final(_Args, State) -> %% @doc gen_server callback. %% @end %%-------------------------------------------------------------------- -handle_call( Msg = info +handle_call( _Msg = #message{ node = Node + , pid = Pid + } + , _From + , State = #?MODULE{ mode = private + , master_pid = MasterPid + , master_node = MasterNode + }) + when {Node, Pid} =/= {MasterNode, MasterPid} -> + io:format("prout"), + {noreply, State}; +handle_call( Msg = #message{ payload = info } , From , State = #?MODULE{ hash = Hash , args = Args , pack_size = PackSize , pack_limit = PackLimit , seed = Seed + , mode = Mode + , master_pid = MasterPid + , master_node = MasterNode + , hash_trunc = Trunc }) -> ?LOG_DEBUG("~p", [{?MODULE, ?FUNCTION_NAME, received, {Msg, From, State}}]), Info = [ {hash, Hash} @@ -659,22 +754,26 @@ handle_call( Msg = info , {pack_size, PackSize} , {pack_limit, PackLimit} , {seed, Seed} + , {mode, Mode} + , {master_pid, MasterPid} + , {master_node, MasterNode} + , {trunc, Trunc} ], {reply, {ok, Info}, State}; -handle_call( Msg = package +handle_call( Msg = #message{ payload = package } , From , State = #?MODULE{ pack = Pack }) -> ?LOG_DEBUG("~p", [{?MODULE, ?FUNCTION_NAME, received, {Msg, From, State}}]), ReversedPack = lists:reverse(Pack), Output = << <> || X <- ReversedPack >>, {reply, {ok, Output}, State}; -handle_call( Msg = pull +handle_call( Msg = #message{ payload = pull } , From , State = #?MODULE{ hash_state = HashState }) -> ?LOG_DEBUG("~p", [{?MODULE, ?FUNCTION_NAME, received, {Msg, From, State}}]), Output = hash_final(HashState, State), {reply, {ok, Output}, State}; -handle_call( Msg = {pull_and_push, Data} +handle_call( Msg = #message{ payload = {pull_and_push, Data} } , From , State = #?MODULE{ hash_state = HashState }) -> ?LOG_DEBUG("~p", [{?MODULE, ?FUNCTION_NAME, received, {Msg, From, State}}]), @@ -682,7 +781,7 @@ handle_call( Msg = {pull_and_push, Data} NewHashState = hash_update(HashState, Data, State), NewState = State#?MODULE{ hash_state = NewHashState }, {reply, {ok, Output}, NewState}; -handle_call( Msg = {push_and_pull, Data} +handle_call( Msg = #message{ payload = {push_and_pull, Data} } , From , State = #?MODULE{ hash_state = HashState }) -> ?LOG_DEBUG("~p", [{?MODULE, ?FUNCTION_NAME, received, {Msg, From, State}}]), @@ -696,7 +795,18 @@ handle_call( Msg = {push_and_pull, Data} %% @doc gen_server callback. %% @end %%-------------------------------------------------------------------- -handle_cast(Msg = pack +% private mode, don't answer. +handle_cast( _Msg = #message{ node = Node + , pid = Pid } + , State = #?MODULE{ mode = private + , master_pid = MasterPid + , master_node = MasterNode + }) + when {Node, Pid} =/= {MasterNode, MasterPid} -> + {noreply, State}; + +% pack command, move output hash into buffer +handle_cast( Msg = #message{ payload = pack } , State = #?MODULE{ hash_state = HashState , pack = Pack , pack_size = PackSize @@ -712,7 +822,9 @@ handle_cast(Msg = pack false -> {noreply, State} end; -handle_cast( Msg = {push_and_pack, Data} + +% push_and_pack command +handle_cast( Msg = #message{ payload = {push_and_pack, Data} } , State = #?MODULE{ hash_state = HashState , pack = Pack , pack_size = PackSize @@ -731,7 +843,9 @@ handle_cast( Msg = {push_and_pack, Data} false -> {noreply, State} end; -handle_cast( Msg = {pack_and_push, Data} + +% pack_and_push command +handle_cast( Msg = #message{ payload = {pack_and_push, Data} } , State = #?MODULE{ hash_state = HashState , pack = Pack , pack_size = PackSize @@ -750,25 +864,34 @@ handle_cast( Msg = {pack_and_push, Data} false -> {noreply, State} end; -handle_cast( Msg = {push, Data} + +% push command +handle_cast( Msg = #message{ payload = {push, Data} } , State = #?MODULE{ hash_state = HashState }) -> ?LOG_DEBUG("~p", [{?MODULE, ?FUNCTION_NAME, received, {Msg, State}}]), NewHashState = hash_update(HashState, Data, State), NewState = State#?MODULE{ hash_state = NewHashState }, {noreply, NewState}; -handle_cast( Msg = roll + +% roll command +handle_cast( Msg = #message{ payload = roll } , State = #?MODULE{ hash_state = HashState }) -> ?LOG_DEBUG("~p", [{?MODULE, ?FUNCTION_NAME, received, {Msg, State}}]), Output = hash_final(HashState, State), NewHashState = hash_update(HashState, Output, State), NewState = State#?MODULE{ hash_state = NewHashState }, {noreply, NewState}; -handle_cast(Msg = {roll, Counter}, State = #?MODULE{}) -> + +% roll command +handle_cast( Msg = #message{ payload = {roll, Counter} } + , State = #?MODULE{}) -> ?LOG_DEBUG("~p", [{?MODULE, ?FUNCTION_NAME, received, {Msg, State}}]), NewHashState = roll_loop(Counter, State), NewState = State#?MODULE{ hash_state = NewHashState }, {noreply, NewState}; -handle_cast( Msg = reset + +% reset command +handle_cast( Msg = #message{ payload = reset } , State = #?MODULE{ args = Args }) -> ?LOG_DEBUG("~p", [{?MODULE, ?FUNCTION_NAME, received, {Msg, State}}]), {ok, InitState} = init(Args), @@ -779,8 +902,18 @@ handle_cast( Msg = reset %% @doc gen_server callback. %% @end %%-------------------------------------------------------------------- -handle_info({drip, Drip}, State = #?MODULE{ hash_state = HashState }) +handle_info( _Msg = #message{ node = Node + , pid = Pid } + , State = #?MODULE{ mode = private + , master_pid = MasterPid + , master_node = MasterNode + }) + when {Node, Pid} =/= {MasterNode, MasterPid} -> + {noreply, State}; +handle_info( Msg = #message{ payload = {drip, Drip}} + , State = #?MODULE{ hash_state = HashState }) when is_binary(Drip); is_bitstring(Drip) -> + ?LOG_DEBUG("~p", [{?MODULE, ?FUNCTION_NAME, received, {Msg, State}}]), NewHashState = hash_update(HashState, Drip, State), NewState = State#?MODULE{ hash_state = NewHashState }, {noreply, NewState}; @@ -833,8 +966,7 @@ hash_init(Hash, _State) -> State :: #?MODULE{}, Return :: reference(). -hash_update(Reference, Data, _State = #?MODULE{ hash_trunc = Trunc - , hash_size = HashSize }) +hash_update(Reference, Data, _State = #?MODULE{ hash_size = HashSize }) when is_bitstring(Data), bit_size(Data) < HashSize -> crypto:hash_update(Reference, <>); hash_update(Reference, Data, _State) ->