OTP gen_fsm

I’m always looking for those links

A fsm stands for Finite state machine so look here if you still have no idea what it really is.
After gen_server and gen_event the gen_fsm is straight forward behaviour. The only thing that wasn’t so obvious for me from the beginning, was that gen_fsm server crashes when it receive message that cannot be matched in current fsm-state. I don’t know why I assumed that it should just ignore a non-matched message as fsm can only switch to next-state by action enabled and performed in now-state. well..

and that’s all folks ;)

OTP gen_event

I’m always looking for those links

It took me quite long time before I wrapped my head around how it really works. So first gen_event behaviour should be considered as two or sometimes three separate elements:

  1. event handler callback module that knows how to deal with incoming event
  2. event manager process that receives all events and passes them to all registered event handlers
  3. [OPTIONAL] event handler guard process that will receive information from event manager when event handler has been removed due to some error

At the beginning it is a bit confusing as it is so similar to gen_server. Especially when both event handler and manager is written in one module. So bear into your mind that gen_event:start spawns new process that is doing nothing interesting and will not make any good unless you provide a callback module that will know how to process the event. Many callback modules can be registered to event manager (by add_handler/3) and then each callback module will receive an event. Event handler module will be never turned into a separate process but it can have its state.

When there is an exception raised while executing handle_* callback, event manager will catch exception and remove event handler module to stay alive. In another words event manager once created cannot be destroyed due to some internal error. If we want to keep track of our handle module we need to register it by external process with add_sup_handler/3. then event manager will notify that process if guarded handler module was removed so in turn it can register module back. follow the link for more information.

Event behaviour mostly is used for logging error or debug, progress info, but it could be used for anything one consider to be useful :).

Starting erl VM with sasl option the alarm_handler will be available. It is simple event manager pre installed with really simply event handler module. the handler will store all events routed to it in a list so it can later pass them all to other event handling module provided by user. It only means that if default handler module will not be replaced then all messages will be stored in the list and the list will grow till all memory and everything around is consumed :) you can use it like this

> erl -boot start_sasl
> alarm_handler:set_alarm("my alarm").

now it’s easy:) so let’s look at small example

event manager

-module(ge_mng).

-define(REF, ?MODULE).
%% API
-export([start_link/0, add_handler/2, add_guarded_handler/2,
         get_handlers/0, info/1]).

start_link() ->
    gen_event:start_link({local, ?REF}). 

add_handler(ModuleName, Args) ->
    ok = gen_event:add_handler(?REF, ModuleName, [Args]).

add_guarded_handler(ModuleName, Args) ->
    {ok, Pid} = simple_hnd_guard:start(?REF, ModuleName, Args),
    simple_hnd_guard:add(Pid).

get_handlers() ->
    gen_event:which_handlers(?REF).

info(Msg) ->
    gen_event:notify(?REF, Msg).

ok, I could include this simple code in callback module below but it is clearer to me that I separate manager from callback part. The other thing is that manager module has no behaviour directive as it exports only api functions.

event handler

-module(ge).
-behaviour(gen_event).

%% gen_event callbacks
-export([init/1, handle_event/2, handle_call/2, 
	 handle_info/2, terminate/2, code_change/3]).

-record(state, {}).

init([_Args]) ->
    {ok, #state{}}.

handle_event(Event, State) ->
    io:format("eh ~p | event : ~p~n", [self(), Event]),
    {ok, State}.

handle_call(_Request, State) ->
    Reply = ok,
    {ok, Reply, State}.

handle_info(_Info, State) -> {ok, State}.
terminate(_Reason, _State) ->  ok.
code_change(_OldVsn, State, _Extra) ->  {ok, State}.

OTP gen_server

I’m always looking for that links

gen_server behaviour is the easiest of all behaviours out there, and the source below is an equivalent of the code I showed before.


-module(gs).

-behaviour(gen_server).

%% API
-export([start_link/1]).
-export([inc/2, dec/2, get/1, set/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
	 terminate/2, code_change/3]).

start_link(InitialCounter) ->
    gen_server:start_link(?MODULE, [InitialCounter], []).

inc(Pid, Val)->
    ok = gen_server:cast(Pid, {inc, Val}).
dec(Pid, Val)->
    ok = gen_server:cast(Pid, {dec, Val}).
get(Pid)->
    {counter, State} = gen_server:call(Pid, get),
    State.
set(Pid, NewState)->
    ok = gen_server:cast(Pid, {set, NewState}).  

%%callbacks-------------------------------------------
init([InitialCounter]) ->
    %do needed computaton and start looping
    State = InitialCounter,
    {ok, State}.

handle_call(get, _From, State) ->
    Reply = {counter, State},
    {reply, Reply, State}.

handle_cast({inc, Val}, State) ->
    {noreply, State+Val};
handle_cast({dec, Val}, State) ->
    {noreply, State-Val};
handle_cast({set, NewState}, _State) ->
    {noreply, NewState}.

%% Description: Handling all non call/cast messages
handle_info(_Info, State) ->{noreply, State}.
terminate(_Reason, _State) -> ok.
code_change(_OldVsn, State, _Extra) ->{ok, State}.

ok, code grew again :) but we gained clear and easy to maintain server that is just counting, and all looping is well hidden from us. What I don’t understand is why cast don’t provide From value, or maybe I miss something..

Manual is more or less clear but for long time I had no idea what are reply/2 and enter_loop/3 for and how to use them.

reply(Client, Reply) -> Result
Types:
Client – see below
Reply = term()
Result = term()

This function can be used by a gen_server to explicitly send a reply to a client that called call/2,3 or multi_call/2,3,4, when the reply cannot be defined in the return value of Module:handle_call/3.
Client must be the From argument provided to the callback function. Reply is an arbitrary term, which will be given back to the client as the return value of call/2,3 or multi_call/2,3,4.
The return value Result is not further defined, and should always be ignored.

So

when the reply cannot be defined in the return value of Module:handle_call/3.

really means that, while caller (C) is still waiting for a replay (and he always will if no timeout is specified), we (service A) could save From value and call asynchronously other service (B) and return from handle_call/3 with no replay. Now while waiting for replay from the other service (B) we can process next call (or cast) and as soon as the service (B) return then we can send the answer back to caller.

async_get(Pid, Val) ->
    {async_replay, Result} = gen_server:call(Pid, {async_call, Val}),
    Result.
async_replay(Pid, FromVal, Result) ->
    ok = gen_server:cast(Pid, {async_replay, FromVal, Result}).

handle_call({async_call, Val}, From, State) ->
    %save From in local session
    %ets:insert(State#state.tab_name, {SessionID, From}),
    %serviceB:process(SessionID, self(), Val),
    
    %OR simply send From to other service
    serviceB:process(SessionID, self(), From, Val),    
    {noreply, State};
...

handle_cast({async_replay, FromVal, Result}, State) ->
    %[{SessionID, From}] = ets:lookup(State#state.tab_name, FromVal),
    %ets:delete(State#state.tab_name, FromId),

    %OR simply send back the result
    From = FromId,
    gen_server:reply(From, {async_replay, Result}),
    {noreply, State};
...

I still have no idea how to use enter_loop fun :) maybe some day :)

OTP behaviours

OTP stands for Open Telecom Platform. It is a way to standardize/organize common and repeated actions into behaviours that everybody should follow.

few good links to follow

issues with messages and receive

each message that is send to a process by its PID is stored in that process’ message-box (mailbox). a message-box is a simple fifo queue that is scanned by receive-after-end clause. receive is a blocking statement and it tries to match incoming messages against provided patterns.

%%from "Programming Erlang" book by Joe Armstrong
receive
    Pattern1 [when Guard1] ->
        Expressions1;
    Pattern2 [when Guard2] ->
        Expressions2;
    ...
after Timeout ->
	TimeoutExpressions;
end

The problems are :

  1. assuring message reached its destination

    the problem is greatly discussed here. Erlang guarantees that all messages that are sent to living process will reach that process, but no exception is raised when you send message to created-but-now-dead or never-existed process. To deal with that, Erlang provides link and trap-exit mechanism to get message back when linked process is killed due to some reason.

    Now we are coming to the heart of the problem – the mechanism would be great unless race conditions and the fact that ‘EXIT’ messages are treated like any other messages. In other words it can take time we noticed that the linked process is dead.

    solution is more about answering yourself do we really care that messages have been lost?, if so do we wait forever until other process deal with our message and send confirmation back or do we use timeouts?. All in all there is no general, out-of-the-box solution to these.

  2. growing mailbox due to fast production and slow consumption of messages.

    let’s recall how receive clause works – when process enters receive statement it scans mailbox and puts aside all messages that doesn’t match to patterns. when no message was matched it blocks and waits for new message, when matched it executes statements for the matched pattern and puts all previously removed messages back. then the process starts all over again. if execution of the message takes long time the mailbox can grow as other processes produce messages faster. so next time process reenter receive clause it needs to scan more and more messages and put them aside. The article advises to use synchronous calls to consumer of our messages as in turn the service will be much faster.

  3. handling priority messages

    Because processes are shield from each other you can’t switch the behaviour of the process (form a context of other process) due to some change to system state. you must send message with switch directive and if that process’s mailbox is big the change will not be immediate. and sometimes this is not acceptable. so one could write nested receive statements to process priority messages first.

        receive
    	{priority, X} -> X
        after 0 ->
    	receive
    	    %match whatever comes
    	    X -> X
    	end
        end
    

    and this is all great but

    1. following the article:

      The main problem with the example in Figure 1, is that we do not take into consideration that when evaluation is resumed from the inner blocking receive we may have more than one message in the mailbox. In a worst case scenario, all but the first, of potentially a huge number, of elements could be priority messages. In this scenario we would actually have accomplished the very opposite of what we intended to do.

      there is a possibility (with showed implementation) that we will process low-priority message first, as it arrives before high-priority message.

    2. it is rather inefficient solution for large mailbox as switching receive clauses will cause scanning mailbox all over from the beginning.

    If business model require hierarchy of priority messages then the article can be useful.

The conclusion is : use synchronous calls and keep the mailbox as small as possible and don’t use priority messages if not needed. Im just wondering if we don’t end up with synchronise everything model

threads

as I showed before, simple code can take more lines than those written in well established languages. However when I start 2 or more threads in Java and pass reference to previously created class, I probably end up with some junk and not with proper counter state. It is because I haven’t  guaranteed that threads will access and change my Counter class in some order (no two threads can write simultaneously, reads are not that dangerous in this situation as we write only one field so it can be treated as atomic operation) . I may have luck and changes to Counter’s state occur without interferences but I wouldn’t bet.

To assure consistency state must be locked for a time when one thread is performing  read or write operation and that other threads are blocked till the winning thread finishes.

// Counter.java
//with synchronization
package sample;

public class Counter {
    private Long state;

    public Counter(Long initialCounter){
        //do needed computation
        this.state = initialCounter;
    }
    public synchronized Long getState() {return state;}
    public synchronized void setState(Long state) {
        this.state = state;
    }

    public synchronized void inc(Long val){this.state += val;}
    public synchronized void dec(Long val){this.state -= val;}
}

In other words to gain access to some resource one thread needs to (not always) block for a while, before it can obtain the lock, it means it cannot perform any other operation. often locks causes bottlenecks or deadlock in the system. And it is easier to program in a way that threads block forever than investigate where the problem really lies. Programmer need to be sure he locks as little as possible and pay attention when invoking other objects’ methods from synchronized code as it can lead to many problems.

In another words concurrent programming is not trivial. Erlangs’ way is slightly different. Threads don’t create objects and pass them around. In Erlang thread is an object, shield from other threads, they don’t share (thus they are called processes) any resources (all values are copied) and communicate by sending and receiving messages to each other as demonstrated below.

set(Pid, NewState)->
    Pid!{set, NewState}.
...
do_counting(State) ->
    receive
	{set, NewState} ->
	    ...
    end,
    done.

On the other hand this approach has its side effects.

making stateless things change

coming from OOP or procedural programming it is very hard to construct systems from functions that receive input and return computed result. function can’t store it’s state as they die after living its context.

On the other hand function can start new process and processes are the one that can have its state and change it while system changes. I started to think of processes as kind of object in OOP.

Process has its interface (all messages it can handle) and state that is guarded. Process has it’s unique responsibilities and encapsulates internal mechanism and data structures it uses. Of course that doesn’t mean you should ask for inheritance or polymorphism support in Erlang. I just want to say that modelling systems is as easy (or when multi-threaded system is concerned – even easier) as with object oriented languages.

so lets see how to create mutable object

%%counter.erl
-module(counter).
-export([init/1, inc/2, dec/2, get/1, set/2]).

inc(Pid, Val)->
    Pid!{inc, Val}.
dec(Pid, Val)->
    Pid!{dec, Val}.
get(Pid)->
    ClientPid = self(),
    Pid!{get, ClientPid},
    receive
	{counter, State} ->
	    State
    end.
set(Pid, NewState)->
    Pid!{set, NewState}.

init(InitialCounter) ->
    %do needed computaton and start looping
    do_counting(InitialCounter).

do_counting(State) ->
    receive
	{inc, Val} ->
	    do_counting(State + Val);
	{dec, Val} ->
	    do_counting(State - Val);
	{get, ClientPid} ->
	    ClientPid ! {counter, State},
	    do_counting(State);
	{set, NewState} ->
	    do_counting(NewState)
    end,
    done.

so it is equivalent to Java code

// Counter.java
package sample;

public class Counter {
    private Long state;

    public Counter(Long initialCounter){
        //do needed computation
        this.state = initialCounter;
    }
    public Long getState() {return state;}
    public void setState(Long state) {this.state = state;}

    public void inc(Long val){this.state += val;}
    public void dec(Long val){this.state -= val;}
}

ok, I admit that erlang’s code is a bit longer but it is thread safe and can interact with others with no further changes.

starting with erlang

lets start with something simple.

erl language itself is quite trivial, but sometimes it is hard to switch to new semantic.
so give it a try

%%trapexit.erl
-module(trapexit).
-export([start/1]).
start(N) ->
    process_flag(trap_exit, true),
    PongPID = spawn(fun() -> pong() end),
    register(pong, PongPID),
    link(PongPID),
    PingPID = spawn(fun() -> ping(N) end),
    register(ping, PingPID),
    link(PingPID),
    receive
	{'EXIT', _From, Reason} ->
	    io:format("Msg ~p~n", [Reason]),
	    exit(PongPID, kill),
	    timer:sleep(25000)
    end
    .
%------------------------------------------------------
ping(0) ->
    io:format("ping | stop~n", []),
    exit(ping);
ping(N) ->
    pong ! ping,
    receive
        pong ->
            io:format("ping | received pong~n", [])
    end,
    ping(N - 1).
pong() ->
    receive
        ping ->
            io:format("pong | received ping~n", []),
            ping ! pong,
            pong();
        {'EXIT', _From, Reason} ->
            io:format("pong | exiting, got ~p~n", [Reason])
    end.

  1. it has flat namespace (that is so … annoying)
  2. module is the simplest unit in erlang platform, the directive simply names the file
  3. module is followed by export directive that tells which function are “public”
  4. everything is composed from stateless functions
  5. you can construct your data from tuples, lists, atoms, strings and numbers
  6. tuples (like {‘EXIT’, From, normal}) are ordered and can hold any type of data and other tuples
  7. atoms start from small letter (like pong) or are single quoted (like ‘EXIT’) – they dont hold any additional value but can be distinguished from each other
  8. variables can be assign only once (that is … good :) ) and always start from big letter (like Reason), if you will not use your variable in a block then just prefix it with “_” (like _From)
  9. spawn creates new process
  10. receive block and ! (abbreviation for send/2 function) makes possible two  processes to communicate
  11. exit ends process execution

very simple program, we have 3 processes where one starts execution, spawns two others and then blocks till ping will exchange msg with pong 10 times. then blocked (main) process receives exit signal (traped by process_flag(trap_exit, true)) and kills pong process.

what is worth to emphasise is that atoms are just dummy place-holders. they start to signify something with specific context.

read more:
a book Programming Erlang: Software for a Concurrent World by Joe Armstrong
http://www.erlang.org/starting.html

http://en.wikipedia.org/wiki/Erlang_programming_language

Hello world!

Welcome to all that progress.

recently I ve started playing with erlang and there are still many things I still don’t know. Anyway I’m quite happy how it solves many problems of concurrency by disconnecting and shielding those objects that usually would be referenced by many concurrent threads and serializing all access to those object by queueing messages from actors in the system.

Of course it is more like swimming in the water sitting in a big plastic bubble rather than snorkelling with all fishes in the see. But in my opinion concurrency is more like cold Baltic rather than warm Mediterranean sea so I prefer to stay in my bubble :).  On the other hand Erlang is not a cure for every concurrency problem and there are many things that need to be taken under account while modelling a system.

I don’t want to write another tutorial and copy informations from manuals but create a place where I can put all my thoughts and links to interesting articles how to write fault tolerant and distributed systems.

what about scala, distributed data stores??? and why my day has only 24h???

Obligatory legal stuff

Unless otherwise noted, all code appearing on this blog is released into the public domain and provided “as-is”, without any warranty of any kind, express or implied, including but not limited to the warranties of merchantability, fitness for a particular purpose and noninfringement. In no event shall the author(s) be liable for any claim, damages, or other liability, whether in an action of contract, tort or otherwise, arising from, out of or in connection with the software or the use or other dealings in the software.