Wakatta!

Like Eureka!, only cooler

Seven Languages in Seven Weeks Erlang Day 3

One last day with Erlang; it was about time we get to perhaps its most significant aspect: its concurrency model, and recipe for error recovery, embodied in its motto, “Let it crash.”

Erlang makes it very cheap and easy to spawn processes (in this context, internal processes. External processes are called nodes), and have the various processes communicate by sending messages to each others. Extending the communication to the network follows the same pattern, and is just as easy.

Processes can also monitor each others, in various ways, so that a crashed process can be restarted. As it is both easy and natural to do, Erlang discourages complex error recovery methods, and instead proposes that processes be allowed to crash, and be restarted. While it is possible to abuse this principle, it can lead to much simpler code and general logic; the lack of mutable data in this regard is another advantage, as there is less risk to leave anything in an inconsistent state (it is still possible, as there are updatable stores).

The book rightfully does not claim to be exhaustive in its coverage, but the exercises are rich and complex enough to offer a glimpse of what is possible in Erlang.

Other capabilities, such as the ability to update code while running, are not covered, but participate in the set of features that make Erlang so suited for robust applications.

Exercises

An OTP service that will restart a process if it dies

That sounds like the supervisor module’s job description. It is not trivial, however: supervised processes must have a descriptor that explains how to start, stop and restart them.

There is a fairly detailed description of the setup in Learn you some Erlang for great good, but it requires a good understanding of everything that comes before. A much shorter introduction is found here.

For instance, using supervisor for keeping the translate_service up and running (assuming it dies on unknown words):

Supervised Translation Service (translate_service_sup.erl) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
-module(translate_service_sup).
-behaviour(supervisor).
-export([loop/0, translate/1]).
-export([start/0]).
-export([init/1]).
-export([start_service/0]).

loop() ->
  receive
    {From, "casa"} ->
      From ! "house",
      loop();
    {From, "blanca"} ->
      From ! "white",
      loop();
    {From,M} ->
      From ! "I do not understand",
      exit({M, not_understood, received_at, erlang:time()})
  end.

translate(Word) ->
  translator ! {self(), Word},
  receive
    Translation -> Translation
  end.

start() ->
    io:fwrite("Starting...~n"),
    register(translator, spawn_link(translate_service_sup, loop, [])),
    {ok, whereis(translator)}.

init(_) ->
    {ok, {{one_for_one, 1, 60},
          [{translate_service_sup, {translate_service_sup, start, []},
            permanent, brutal_kill, worker, [translate_service_sup]}]}}.

start_service() ->
    io:fwrite("start_service~n"),
    supervisor:start_link(translate_service_sup, []).

I register the spawned process to the atom translator; this is because I could not find a way to retrieve the process id of the translate loop.

Starting the supervised process is now simple:

Using the Supervised Translator
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
1> c(translate_service_sup).
{ok,translate_service_sup}
2> translate_service_sup:start_service().
start_service
Starting...
{ok,<0.38.0>}
3> translate_service_sup:translate("casa").
"house"
4> translate_service_sup:translate("que").
Starting...
"I do not understand"
5> translate_service_sup:translate("casa").
"house"
6> translate_service_sup:translate("que").
Starting...
"I do not understand"
7> translate_service_sup:translate("que").
** exception exit: shutdown

As seen above, the translator chokes on an unknown word, but is restarted immediately. However, if the process dies too often (here, more than once per minute), the supervisor kills it definitively.

Documentation for building a simple OTP server

That question a bit open ended. If we are talking about a simple TCP server, then the gen_tcp module is enough to get a server, as shown in this post.

To go further, and add supervisors and other OTP goodies, there is a tutorial on trapexit.org, but it uses code generating tools which are not part of the standard Erlang distribution.

Monitor the translate_service

Just to explore the API introduced today, I went a bit beyond the exercise, and made the doctor able to monitor any kind of process, and attach it to a supplied atom.

First, a utility function start creates a new doctor process, then sends it a message to spawn the monitored process.

The new loop function takes both the function to be spawned and monitored, and the atom to attach it to. Both parameters are kept in the loop as argument.

Improved doctor Module (doctor.erl) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
-module(doctor).
-export([start/2]).
-export([loop/2]).

start(F, A) ->
  D = spawn(doctor, loop, [F, A]),
  D ! new,
  D.

loop(F, A) ->
  process_flag(trap_exit, true),
  receive
    new ->
      io:format("Creating and monitoring process, attaching to atom ~p.~n", [A]),
      register(A, spawn_link(F)),
      loop(F, A);
    {'EXIT', From, Reason} ->
      io:format("The process ~p ~p died with reason ~p.", [A, From, Reason]),
      io:format(" Restarting~n."),
      self() ! new,
      loop(F, A)
  end.

The translate_service is slightly modified to die of shame on untranslatable words.

Modified translate_service Module (translate_service.erl) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
-module(translate_service).
-export([loop/0, translate/2]).

loop() ->
  receive
    {From, "casa"} ->
      From ! "house",
      loop();
    {From, "blanca"} ->
      From ! "white",
      loop();
    {From,M} ->
      From ! "I do not understand",
      exit({M, not_understood, received_at, erlang:time()})
  end.

translate(To, Word) ->
  To ! {self(), Word},
  receive
    Translation -> Translation
  end.

With these defined, it is possible to run two monitored services and attach them to different atoms:

Testing the Doctor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1> DocRev = doctor:start(fun roulette:loop/0, revolver).
Creating and monitoring process, attaching to atom revolver.
<0.33.0>
2> DocTrans = doctor:start(fun translate_service:loop/0, translator).
Creating and monitoring process, attaching to atom translator.
<0.36.0>
3> revolver ! 1.
1
4> revolver ! 3.
3
5> revolver ! 1.
1
6> translate_service:translate(translator, "casa").
click.
bang.
The process revolver <0.34.0> died with reason {roulette,die,at,{10,31,0}}. Restarting
.Creating and monitoring process, attaching to atom revolver.
"house"
7> translate_service:translate(translator, "que").
The process translator <0.37.0> died with reason {"que",not_understood,
                                                  received_at,
                                                  {10,31,0}}."I do not understand"
 Restarting
.Creating and monitoring process, attaching to atom translator.
8> translate_service:translate(translator, "casa").
"house"

Both services are restarted properly when dying.

Self monitoring Doctor

For the Doctor to successfully monitor itself, it should also be able to restart (or restart monitoring) whatever process it was monitoring when it died.

For this, I create a new d_monitor function that spawns a monitor loop. The atom argument is used (or abused) to indicate whether we are starting an doctor monitoring loop, or the target function monitoring loop. In the former case, d_monitor is called again; in the latter the target function is spawned.

When d_monitor is called from start, the former case is triggered; when called from loop, the latter case is.

This means that for every call to start, there is two “nested” loops running, the outside one monitoring the inside one, and the inside one in charge of the target function.

To test the code, I gave the inside loop a non zero parameter which is decreased every time it has to restart the monitored function. When it reaches zero, the monitoring stops, and has to be restarted as well.

Finally, instead of spawning directly the target function, I use the check_for function to first try to locate a process given the atom; only if this fails would a new process be spawned. The function is not very robust (in particular, if two monitors are trying to restart the process at the same time, one might fail rather than reattaching, because of the order of concurrent evaluation).

Module Doctor_2 (doctor_2.erl) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
-module(doctor_2).
-export([start/2]).
-export([loop/3]).

start(F, A) ->
  d_monitor(F, {out, A}, -1).

d_monitor(F, A, N) ->
  D = spawn(doctor_2, loop, [F, A, N]),
  D ! new,
  D.

loop(_, _, 0) -> exit({running, out_of, time, erlang:time()});
loop(F, A, N) ->
  process_flag(trap_exit, true),
  receive
    new ->
      case A of
        {out, At} -> io:format("Creating and monitoring process.~n"),
                P = d_monitor(F, At, 3),
              link(P);
        _ -> check_for(A, F)
      end,
      loop(F, A, N);
    {'EXIT', From, Reason} ->
      io:format("The process ~p ~p died with reason ~p.", [A, From, Reason]),
      io:format(" Restarting~n."),
      self() ! new,
      loop(F, A, N-1)
  end.

check_for(A, F) ->
  Found = lists:member(A, registered()) andalso erlang:is_process_alive(whereis(A)),
  if
    Found ->
    io:format("reattaching to running process ~p.~n", [A]),
    link(whereis(A)),
    true;
  true ->
      io:format("Creating and monitoring process, attaching to atom ~p.~n", [A]),
      register(A, spawn_link(F)),
      true
  end.

To test the new code, I have written it so that the inner loop will die after 3 restarts of the monitored process:

Self monitoring Doctor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1> doctor_2:start(fun roulette:loop/0, revolver).
Creating and monitoring process.
Creating and monitoring process, attaching to atom revolver.
<0.33.0>
2> revolver ! 3.
bang.
3
The process revolver <0.35.0> died with reason {roulette,die,at,{11,6,30}}. Restarting
.Creating and monitoring process, attaching to atom revolver.
3> revolver ! 3.
bang.
3
The process revolver <0.38.0> died with reason {roulette,die,at,{11,6,32}}. Restarting
.Creating and monitoring process, attaching to atom revolver.
4> revolver ! 3.
bang.
3
The process revolver <0.40.0> died with reason {roulette,die,at,{11,6,34}}. Restarting
.The process {out,revolver} <0.34.0> died with reason {running,out_of,time,
                                                      {11,6,34}}. Restarting
.Creating and monitoring process.
Creating and monitoring process, attaching to atom revolver.
5> revolver ! 1.
click.
1

As seen at the third call to revolver ! 3, after restarting the process, the monitor dies, then is restarted and attaches to the already running revolver process.

Monitoring the monitor. And vice-versa

For this exercise, I will go back to the simpler, original doctor module (the one that only keeps the shooter alive).

I will use three atoms, doctor, monitor and revolver (for roulette:loop), to register the processes. The doctor:loop will be able to restart both monitor and shooter; monitor:loop will be able to restart doctor; roulette will have no such capacity.

To keep things easy, I will also make sure the processes can easily be killed.

First, I’ve move the function to look for a process by name to its own module; I also make sure that each module has a start function that registers a spawned process, and triggers the initialization process. This function is now the argument to the look_for function, so it no longer has to do the spawning and registering.

Look for process (look.erl) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
-module(look).
-export([look_for/2]).

look_for(A, F) ->
    Found = lists:member(A, registered()) andalso erlang:is_process_alive(whereis(A)),
    if
        Found ->
            io:format("reattaching to running process ~p.~n", [A]),
            link(whereis(A)),
            false;
        true ->
            io:format("Creating and monitoring process, attaching to atom ~p.~n", [A]),
            F(),
            true
    end.

The doctor module is similar to the original. The differences are that it respond to the die message, also it has a start message that checks for the monitor process. It uses the monitor:start function, as explained above, so that if the monitor has to be started, it will also be properly initialized.

Doctor (doctor.erl) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
-module(doctor).
-export([loop/0, start/0]).

loop() ->
    process_flag(trap_exit, true),
    receive
        start ->
            io:format("Looking for monitor...~n"),
            look:look_for(monitor, fun monitor:start/0),
            self() ! new,
            loop();
        new ->
            io:format("Looking for revolver...~n"),
            look:look_for(revolver, fun roulette:start/0),
            loop();
        die ->
            io:format("Aaargh...~n"),
            exit({doctor, died, at, erlang:time()});
        {'EXIT', From, Reason} ->
            io:format("The process ~p died with reason ~p.~n", [From, Reason]),
            self() ! start,
            loop()
    end.

start() ->
    register(doctor, spawn_link(fun doctor:loop/0)),
    doctor ! start.

The monitor module is simpler: it only needs to check for the doctor process, running the doctor:start function if needed.

Monitor (monitor.erl) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
-module(monitor).
-export([loop/0, start/0]).

loop() ->
    process_flag(trap_exit, true),
    receive
        start ->
            io:format("Looking for doctor...~n"),
            look:look_for(doctor, fun doctor:start/0),
            loop();
        die ->
            io:format("Aaargh...~n"),
            exit({monitor, died, at, erlang:time()});
        {'EXIT', From, Reason} ->
            io:format("The process ~p died with reason ~p.~n", [From, Reason]),
            self() ! start,
            loop()
    end.

start() ->
    register(monitor, spawn_link(fun monitor:loop/0)),
    monitor ! start.

Finally, the slightly modified roulette module (it now has a roulette:start function):

Roulette (roulette.erl) download
1
2
3
4
5
6
7
8
9
10
11
12
-module(roulette).
-export([loop/0]).
-export([start/0]).

loop() ->
  receive
    3 -> io:format("bang.~n"), exit({roulette, die, at, erlang:time()});
    _ -> io:format("click.~n"), loop()
  end.

start() ->
    register(revolver, spawn_link(fun roulette:loop/0)).

The basic doctoring still works:

Testing the Monitored Doctor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
1> doctor:start().
Looking for monitor...
start
Creating and monitoring process, attaching to atom monitor.
Looking for revolver...
Looking for doctor...
Creating and monitoring process, attaching to atom revolver.
reattaching to running process doctor.
2> revolver ! 1.
1
click.
3> revolver ! 3.
3
bang.
The process <0.36.0> died with reason {roulette,die,at,{21,33,1}}.
Looking for monitor...
reattaching to running process monitor.
Looking for revolver...
Creating and monitoring process, attaching to atom revolver.
4> revolver ! 1.
1
click.

The doctor can be killed; monitor will restart it:

Testing the Monitored Doctor 2
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
5> doctor ! die.
die
Aaargh...
The process <0.33.0> died with reason {doctor,died,at,{21,33,47}}.
Looking for doctor...
Creating and monitoring process, attaching to atom doctor.
Looking for monitor...
reattaching to running process monitor.
Looking for revolver...
Creating and monitoring process, attaching to atom revolver.
** exception error: {doctor,died,at,{21,33,47}}
6> revolver ! 3.
3
bang.
The process <0.43.0> died with reason {roulette,die,at,{21,33,56}}.
Looking for monitor...
reattaching to running process monitor.
Looking for revolver...
Creating and monitoring process, attaching to atom revolver.
7> revolver ! 1.
click.
1

Finally, monitor can also be killed; it is restarted by doctor:

Testing the Monitored Doctor 3
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
8> monitor ! die.
die
Aaargh...
The process <0.35.0> died with reason {monitor,died,at,{21,34,44}}.
Looking for monitor...
Creating and monitoring process, attaching to atom monitor.
Looking for revolver...
Looking for doctor...
reattaching to running process revolver.
reattaching to running process doctor.
9> doctor ! die.
Aaargh...
die
The process <0.42.0> died with reason {doctor,died,at,{21,34,47}}.
Looking for doctor...
Creating and monitoring process, attaching to atom doctor.
Looking for monitor...
reattaching to running process monitor.
Looking for revolver...
Creating and monitoring process, attaching to atom revolver.
10> revolver ! 3.
bang.
3
The process <0.52.0> died with reason {roulette,die,at,{21,34,51}}.
Looking for monitor...
reattaching to running process monitor.
Looking for revolver...
Creating and monitoring process, attaching to atom revolver.
11> revolver ! 1.
click.
1

Logging messages to a file

For this I will just derive the code from the one on this post (which I mentioned above).

Rather than echoing the received messages, they are written to disk:

Logging messages (logger.erl) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
-module(logger).
-export([start_server/0, connect/1, recv_loop/2]).

-define(LISTEN_PORT, 9000).
-define(TCP_OPTS, [binary, {packet, raw}, {nodelay, true}, {reuseaddr, true}, {active, once}]).

start_server() ->
    case gen_tcp:listen(?LISTEN_PORT, ?TCP_OPTS) of
        {ok, Listen} -> spawn(?MODULE, connect, [Listen]),
                        io:format("~p Server Started.~n", [erlang:localtime()]);
        Error ->
            io:format("Error: ~p~n", [Error])
    end.

connect(Listen) ->
    {ok, Socket} = gen_tcp:accept(Listen),
    inet:setopts(Socket, ?TCP_OPTS),
    spawn(fun() -> connect(Listen) end),
    {ok, File} = file:open("log.txt", [append]),
    recv_loop(Socket,File),
    file:close(File),
    gen_tcp:close(Socket).

recv_loop(Socket,File) ->
    inet:setopts(Socket, [{active, once}]),
    receive
        {tcp, Socket, Data} ->
            io:format("~p ~p ~p~n", [inet:peername(Socket), erlang:localtime(), Data]),
            file:write(File, Data),
            gen_tcp:send(Socket, "I wrote down " ++ Data),
            recv_loop(Socket,File);
        {tcp_closed, Socket} ->
            io:format("~p Client Disconnected.~n", [erlang:localtime()])
    end.

This works nicely because the file:write function expects bytes rather than a string.

Network aware translate_service

Once again I use the simple TCP server code. For this exercise, I have extended the translation service so that it can learn new words in a given session: using the put word:meaning command, the server will learn a new word. This is done by keeping a dictionary (a list of tuples) as argument for the recv_loop function.

I convert from binary to list because they are slightly easier to work with.

Network aware translate_service (translate_service.erl) download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
-module(translate_service).
-export([start_server/0]).
-export([connect/1]).
-export([cleanup/1]).

-define(LISTEN_PORT, 9000).
-define(TCP_OPTS, [binary, {packet, raw}, {nodelay, true}, {reuseaddr, true}, {active, once}]).

start_server() ->
    case gen_tcp:listen(?LISTEN_PORT, ?TCP_OPTS) of
        {ok, Listen} -> spawn(?MODULE, connect, [Listen]),
                        io:format("~p Server Started.~n", [erlang:localtime()]);
        Error ->
            io:format("Error: ~p~n", [Error])
    end.

connect(Listen) ->
    {ok, Socket} = gen_tcp:accept(Listen),
    inet:setopts(Socket, ?TCP_OPTS),
    spawn(fun() -> connect(Listen) end),
    recv_loop(Socket,[{"casa", "house"}, {"blanca", "white"}]),
    gen_tcp:close(Socket).

recv_loop(Socket,Dict) ->
    inet:setopts(Socket, [{active, once}]),
    receive
        {tcp, Socket, Data} ->
            io:format("~p ~p ~p~n", [inet:peername(Socket), erlang:localtime(), Data]),
            {Msg, Dict2} = process_message(Dict, cleanup(Data)),
            gen_tcp:send(Socket, binary:list_to_bin(Msg)),
            recv_loop(Socket, Dict2);
        {tcp_closed, Socket} ->
            io:format("~p Client Disconnected.~n", [erlang:localtime()])
    end.

cleanup(Data) ->
    binary:bin_to_list(Data, {0, byte_size(Data) - 2}).

process_message(Dict, Data) ->
    case Data of
        [$p,$u,$t,$ |Def] ->
            [Word, Meaning] = string:tokens(Def, ":"),
            {"Noted\r\n", [{Word, Meaning}|Dict]};
        _ -> {translate(Data, Dict) ++ "\r\n", Dict}
    end.

translate(Word, Dict) ->
    M = lists:keyfind(Word, 1, Dict),
    case M of
        false ->
            "I do not understand " ++ Word;
        {_, Meaning} -> Meaning
    end.

As seen in the interaction below, the translator knows its basic words, and can learn new ones:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
$ telnet localhost 9000
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
casa
house
blanca
white
que
I do not understand que
put que:what
Noted
que
what
^]
telnet> Connection closed.

Wrapping up Day 3 and Erlang

Erlang was a fun and interesting language to play with. Looking at the documentation and existing tutorials (especially the intermediate to advanced ones), I had a feeling that, if Java can be said to be Enterprise, Erlang is Industrial. It does look like a control panel in a power plant: rich, complex and yet as simple as possible for what it is meant to achieve, and likely to kill you if you approach it with buzzwords in mind.

This is another language I will be eager to get back to.

Comments