October 25, 2015
In Erlang and Elixir we talk about process the way you would talk about threads in some other language. They are not system process. You cannot see them using ps
or top
. They are managed and created by the Erlang VM.
And the Erlang VM is pretty good at managing them.
iex(1)> pid = spawn(fn -> IO.puts "im alive" end)
im alive
#PID<0.61.0>
iex(3)> Process.alive? pid
false
So we spawned a process with the job to execute the anonymous function we passed to it, and then it quit. Cool. What else can we do?
A big part of what makes the Erlang concurrency model so great is the use of messages. By starting a processes to do one thing and then communicate with that process through messages you have built a system that scales very easily to be global. We don’t really care that the process we are sending a message to is a local process. It could just as easily be in some datacenter on the other side of the world. No worries. We build a system ready to be scaled.
A message is sent either to a pid
(process id) or to a named process. Lets start by sending messages to a pid.
Start iex.
iex(1)> self
#PID<0.57.0>
iex(2)> send(self, "hello")
"hello"
iex(3)> flush
"hello"
:ok
Self returns the pid of the process we are in now. In this case iex itself.
Send takes two parameters, a pid
and the message.
We have not yet configured our process to listen for messages, but a short hand is to flush
the mailbox for the current process. That will list all messages received.
Alright. Sending and flushing all done.
So if processes talks through messages we need a way to send and listen to messages that is being sent around.
Let’s create a small Worker
process. Create a file called processes.ex
.
defmodule ElixirTest.BasicWorker do
def loop do
receive do
{sender_pid, _} ->
send(sender_pid, {:ok, :pong})
loop
end
end
end
It is just a echo service.
How to use it then?
iex(1)> c("processes.ex")
Compiles the file so we can use the modules from iex. Then spawn a process. Remember to save the pid.
iex(2)> pid = spawn(ElixirTest.BasicWorker, :loop, [])
#PID<0.74.0>
After that send a message to it and check the mail box
, lacking a better term.
iex(3)> send(pid, {self, :ok})
{#PID<0.57.0>, :ok}
iex(4)> flush
{:ok, :pong}
:ok
Awesome. Our process replied. We could just keep on sending messages.
iex(18)> send(pid, {self, :ok})
{#PID<0.57.0>, :ok}
iex(19)> send(pid, {self, :ok})
{#PID<0.57.0>, :ok}
iex(20)> flush
{:ok, :pong}
{:ok, :pong}
:ok
And since everything Elixir is pattern matching we could send very specific messages.
What processing a message takes some time? Think a costly http request or some calculations, will the iex prompt freeze? Let’s see.
Modify your worker to include a delay to simulate some heavy work.
defmodule ElixirTest.BasicWorker do
def loop do
receive do
{sender_pid, _} ->
:timer.sleep(1000)
send(sender_pid, {:ok, :pong})
loop
end
end
end
Reload or recompile the module.
iex(21)> r(ElixirTest.BasicWorker)
And try it out!
iex(26)> send(pid, {self, :ok})
{#PID<0.57.0>, :ok}
iex(27)> flush
:ok
iex(28)> flush
{:ok, :pong}
:ok
The first flush has not received any message from your process but still you are free to keep on doing stuff.
This is not weird or anything special. send
have to be async, otherwise whats the point? But it’s good to be sure.
Everything dies. So how would send behave if the process died?
Let us kill it.
iex(31)> Process.exit(pid, :kill)
true
iex(32)> Process.alive? pid
false
iex(33)> send(pid, {self, :ok})
{#PID<0.57.0>, :ok}
We can still send messages. send
doesn’t wait for a replie so if the process is listening or not doesn’t really matter.
iex(35)> flush
:ok
After a flush we can see that nothing has been sent to our iex process. Well, the worker is dead so that is to be expected.
Iex, our interactive elixir console, is just a process as anything else. We know the pid. So we can do stuff with it, like kill it.
iex(40)> self
#PID<0.57.0>
iex(41)> Process.exit(self, :exit)
** (EXIT from #PID<0.57.0>) :exit
Interactive Elixir (1.1.1) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> self
#PID<0.23081.0>
It restarted it self.
That is not true, but it was restarted. We got a new pid, so it is not the same process.
The iex is part of an application, which has a supervisor. Supervisors have one job, and that is to handle crashed processes. We can see our current supervisor tree and the applications running on our BEAM.
iex(4)> :observer.start
:ok
A gui application will start, where you can browse around and do stuff.
Click the tab Applications and you will see iex listed to the left. The graph shows the tree of processes for that application.
You will see a Supervisor. Klick on that blob and in the menu select Kill. You will see it does not come back.
In the running iex we see a message:
iex(5)>
23:21:16.631 [info] Application iex exited: killed
Killed and closed.
Restart iex and spawn one of our small workers. We will use Process.info
to learn something about it.
iex(2)> pid = spawn(ElixirTest.BasicWorker, :loop, [])
#PID<0.68.0>
iex(3)> self
#PID<0.57.0>
iex(4)> Process.info(pid)
[current_function: {ElixirTest.BasicWorker, :loop, 0},
initial_call: {ElixirTest.BasicWorker, :loop, 0}, status: :waiting,
message_queue_len: 0, messages: [], links: [], dictionary: [],
trap_exit: false, error_handler: :error_handler, priority: :normal,
group_leader: #PID<0.26.0>, total_heap_size: 233, heap_size: 233,
stack_size: 2, reductions: 1,
garbage_collection: [min_bin_vheap_size: 46422, min_heap_size: 233,
fullsweep_after: 65535, minor_gcs: 0], suspending: []]
Lots of information! We can see what the process is doing right now with the current_function
and status
. We can see messageges waiting to be processed. We can see the heap size and more memory information.
Set the delay in our worker to something long, 20 seconds or so. Send 4 messages or so to the process. Check the info now.
iex(10)> send(pid, {self, :ping})
{#PID<0.57.0>, :ping}
iex(11)> send(pid, {self, :ping})
{#PID<0.57.0>, :ping}
iex(12)> send(pid, {self, :ping})
{#PID<0.57.0>, :ping}
iex(13)> send(pid, {self, :ping})
{#PID<0.57.0>, :ping}
iex(14)> send(pid, {self, :ping})
{#PID<0.57.0>, :ping}
iex(15)> Process.info(pid)
[current_function: {:timer, :sleep, 1},
initial_call: {ElixirTest.BasicWorker, :loop, 0}, status: :waiting,
message_queue_len: 4,
messages: [{#PID<0.57.0>, :ping}, {#PID<0.57.0>, :ping}, {#PID<0.57.0>, :ping},
{#PID<0.57.0>, :ping}], links: [], dictionary: [], trap_exit: false,
error_handler: :error_handler, priority: :normal, group_leader: #PID<0.26.0>,
total_heap_size: 233, heap_size: 233, stack_size: 4, reductions: 5,
garbage_collection: [min_bin_vheap_size: 46422, min_heap_size: 233,
fullsweep_after: 65535, minor_gcs: 0], suspending: []]
The process now have messagequeue_len: 4, meaning it has messages waiting. Since the process only handles one message at a time, the messages to iex will come at 20seconds intervals.
So we found a problem. Our worker is async. But it does not do work in parallell. This will definitely be a problem. To solve this we create a server which only has one job. Listen to messages, and for each message start a worker process that handles the heavy calculations (:timer.sleep in our case).
Show me some code!
defmodule ElixirTest.BasicWorker do
def execute sender_pid do
:timer.sleep(10000)
send(sender_pid, {:ok, :pong})
end
end
defmodule ElixirTest.BasicServer do
def loop do
receive do
{sender_pid, _} ->
worker_pid = spawn(ElixirTest.BasicWorker, :execute, [sender_pid])
loop
end
end
end
For each message the server spawns a worker that does all the calculations and after sends a message to our iex pid.
iex(7)> send(server_pid, {self, :ping})
{#PID<0.57.0>, :ping}
iex(8)> send(server_pid, {self, :ping})
{#PID<0.57.0>, :ping}
iex(9)> send(server_pid, {self, :ping})
{#PID<0.57.0>, :ping}
iex(10)> send(server_pid, {self, :ping})
{#PID<0.57.0>, :ping}
iex(11)> Process.info(server_pid)
[current_function: {ElixirTest.BasicServer, :loop, 0},
initial_call: {ElixirTest.BasicServer, :loop, 0}, status: :waiting,
message_queue_len: 0, messages: [], links: [], dictionary: [],
trap_exit: false, error_handler: :error_handler, priority: :normal,
group_leader: #PID<0.26.0>, total_heap_size: 233, heap_size: 233,
stack_size: 1, reductions: 1149,
garbage_collection: [min_bin_vheap_size: 46422, min_heap_size: 233,
fullsweep_after: 65535, minor_gcs: 0], suspending: []]
iex(12)> flush
:ok
No messages in the queue. All have been processed. Or started at least. We still haven’t received any messages. Wait around 10 seconds and the messages should start to come in.
ex(15)> flush
{:ok, :pong}
{:ok, :pong}
:ok
Cool. So we built a async worker, which didn’t do any work in parallel. Then we modified it with a dispatcher and it got some parallel super powers.
Written by Simon Ström as a way to remember. It's a dev log of thinks I want to remember.