Tydzień temu dowiedzieliśmy się jak działa maszyna wirtualna erlanga – BEAM. Dziś zaś namacalnie dotkniemy procesy oraz sposób komunikacji między nimi. Więc jeżeli nie czytaliście poprzedniego posta, warto teraz go przeczytać.

spawn

Sposobem tworzenia nowego procesu w elixir jest metoda spawn. Tworzy ona nowy proces i zwraca jego PID (process identifier). Jest ona jednak przeciążona i można ją wołać na dwa różne sposoby. Za pomocą lambdy, lub za pomocą trzech parametrów.

Popatrzmy na dwa sposoby wywoływania, najpierw przez lambdę:

spawn(fn -> IO.puts(1+ 2) end)
> 3
> #PID<0.94.0>

To co zrobił powyższy kod, to:

  • Stworzył nowy proces
  • Zwrócił ID nowego procesu (#PID<0.94.0>)
  • W tym procesie wykonał asynchronicznie operację 1+2 (może ona być szybsza nić zwrócenie PID)

Drugi sposób wywółania:

defmodule SimpleAdd do
    def add(x, y) do
        IO.puts("#{inspect(self)}: #{x + y}");
    end
end

self
spawn(SimpleAdd, :add, [1,2])

Zamiast więc labmdy z informacją co ma zostać wywołane, przekazujemy moduł, funkcję i jej argumenty. Przykład rozszerzyłem o inforamcje o procesie:

  • self zwraca aktualny PID procesu
  • inspect(self) umożliwia dodanie go do stringa

Teraz jak wołamy kod, to dostaniemy nasz aktualny PID a potem wynik działania z informacją w jakim procesie (PID) został kod wykonany.

Podsumowując metoda spawn umożliwia nam tworzenie procesu w którym możemy wykonać jakieś konkretne zadanie. Pominąłem tutaj opis funkcji typu spawn_link który łączy dane proces z naszym i jak jeden z nich padnie to dostaniemy sygnał wyjścia z procesu. Czy też spawn_monitor, który robi to samo co spawn_link ale już nie łączy procesów, więc jedynie dostajemy informację o sygnale wyjście a nie dostaniemy sygnał wyjścia.

send

Nie każdy kod równoległy musi dodawać cyfry. Może zamiast tego robić coś bardziej zaawansowanego, skomplikowane obliczenia, pobranie danych z sieci itp. itd. Wszystko jest fajnie do póki ten kod nie musi się z nami komunikować, czyli robimy fire-forget. Jednak jeżeli chcemy mieć informację zwrotną – załadowany tekst ze strony, wynik analizy złożoności itp. itd. to musimy w jakiś sposób umożliwić komunikację między procesami.

By jeden proces mógł skomunikować się z drugim to musi on wysłać wiadomości, wiadomość jest słana za pomocą metody send. Jakbyśmy chcieli wysłać wiadomość  z wynikiem działania naszej operacji dodawania to nasz spawn wyglądał by mniej więcej tak:

spawn(fn -> send DEST, {:ok, 1 + 2} end)

Gdzie DEST może być jedną z wielu rzeczy w tym PID. Ale także to może być zarejestrowany atom jak i nasz lokalny port. Dla przykładu, możemy mieć atom :addmsg i zarejestrowac go w ten sposób:

Process.register(self, :addmsg)

Teraz nasz spawn z send by wyglądał tak:

spawn(fn -> send :addmsg, {:ok, 1 + 2} end)

Ale, wykonanie już:

spawn(fn -> send self, {:ok, 1 + 2} end)

Nam nie zadziała tak jak byśmy my tego chcieli. A powód jest dość prosty, self tutaj będzie odnosiło się do PID procesu stworzonego przez spawn. Więc aby podać nasz PID musimy stworzyć zmienną:

current = self
spawn(fn -> send current, {:ok, 1 + 2} end)

Druga sprawa, to zauważyliście, że do send jako drugi parametr przekazałem tuple. Jest to ogólnie dobry zwyczaj choć msg nie musi być tuple. To znaczy, moglibyśmy równie dobrze wysłać wartość i to by nam zadziałało :) Ale jak robimy jakiekolwiek inne operacje bardziej skomplikowane to warto jednak dodać informację czy udało się wykonać daną operację. W elixir przyjmuje się wykorzystanie tuple w postaci {:ok, content} i {:error, reason} do informowania o sukcesie lub porażce danej operacji.

receive do/end

Skoro, jesteśmy wstanie już wysyłać wiadomość to pora ją odebrać. W najprostszym przypadku możemy wykonać następujący kod:

send(self, 1)

receive do
    1 -> IO.puts "I got 1"
end

Co tutaj się dzieje i dlaczego to działa?

Pierwsza linijka jest nam znana, wysyłamy wiadomość o treści 1 do naszego aktualnego procesu. Następnie chcemy odebrać wiadomość która spełnia określony wzorzec (działa prawie tak samo jak case). Jak go spełni, to otrzymujemy wiadomość I got 1. Tylko że, my chcemy to otrzymać po tym jak to wysłaliśmy. I co gorsza, to działa! :) dlaczego to działa? Dlatego, że każdy proces ma tak zwaną skrzynkę odbiorczą. Wszystkie wiadomości wysyłane do danego procesu, trafiają do niej. Czyli jak zrobimy 5 razy send i potem receive to dostaniemy wiadomość 5 razy.

Warto już teraz wspomnieć o kilku sprawach z receive:

  • receive blokuje wykonywanie kodu. Jeżeli wiadomości nie ma to będzie tak długo czekał, aż ją otrzyma (przykład nieskończone czekanie)
  • Jeżeli nie ma wzorca który pasuje do wiadomości, to wiadomość nie zostanie odebrana i my dalej będziemy czekać na wiadomość która spełni nasze warunki (przykład brak wzorca)
  • Mailbox działa jak First Come, First Serve. Jak mamy 1000 wiadomości w kolejne które nie mogą zostać z dopasowane do wzorca, to dla każdej wiadomości która przyjdzie, receive będzie szedł od początku przez wszystkie wiadomości próbując je dopasować do wzorca.

Przykład nieskończone czekanie

receive do
    2 -> IO.puts "I got 2"
end

Przykład brak wzorca

send(self, 3)

receive do
    2 -> IO.puts "I got 2"
end

Mamy dwa wyjścia z sytuacji patowej (blokady), albo dajemy wzorzec, który pasuje do wszystkich wiadomości (to nie jest najlepsze rozwiązanie), albo dodajemy warunek, że jak po X milisekundach nic się nie wydarzy, to przerwij oczekiwanie:

# catch anything
send(self, 3)

receive do
    2 -> IO.puts "I got 2"
    _ -> IO.puts "got smth!"
end

# stop waiting after 2 seconds
send(self, 3)

receive do
    2 -> IO.puts "I got 2"
    after 2000 -> IO.puts "timeout"
end

Dodatkowo, tak jak wszystko w elixir, receive zwraca wartość, więc dla przykładu:

my_num = receive do
    2 -> 2
    after 2000 -> 0
end

my_num będzie miało wartość 2 jak przyjdzie dwójka, lub 0 jak będzie timeout. Zastanawiałem się czy nie dać tutaj bardziej zaawansowanego przykładu, jednak już byśmy zahaczyli o procesy serwerowe/backendowe. O czym chciałbym więcej przeczytać i mieć więcej czasu na zabawę, więc będą za tydzień :)

Podsumowanie

Dziś przeszedłem przez kilka prostych opcji zlecenia pracy nowemu procesowi jak i komunikacji pomiędzy procesami. Wiedza ta, pozwoli nam zbudować proces serwerowy/backendowy, który będzie mógł działać w kółko i coś wykonywać. Jednak to już następnym razem. Będzie to nam potrzebne do napisania aplikacji, gdyż jedną z rzeczy jaką ona będzie musiała robić, to wykonywać równocześnie na kilku/kilkunastu procesach pewną operację która może trwać więcej niż kilka milisekund.

Poniżej w punktach to czego się dzisiaj dowiedzieliśmy:

  • spawn umożliwia wykonywanie poleceń w osobnym procesie
  • send umożliwia wysłanie wiadomości do określonego procesu podanego jako argument funkcji
  • każdy proces ma skrzynkę odbiorczą do której trafiają wiadomości wysłane przez send
  • receive do/end służy do odbierania wiadomości
  • receive działa jak case wykorzystując pattern matching do wiadomości
  • receive blokuje wykonanie kodu
  • after służy do określenia maksymalnego czasu oczekiwania na wiadomość w receive
  • Wiadomości ze skrzynki odbiorczej są obsługiwane na zasadzie First Come, First Serve.
  • Za każdym razem kiedy przyjdzie nowa wiadomość, wszystkie wiadomości w skrzynce odbiorczej są przetwarzane od początku (od najstarszej do najnowszej)