poniedziałek, 14 lipca 2014

Asynchroniczne powiadomienia w PostgreSQLu czyli NOTIFY i LISTEN

Wiedzieliście, że PostgreSQL oferuje mechanizm wysyłki asynchronicznych notyfikacji? Aż głupio przyznać, ale ja się dowiedziałam o tym przy okazji poznawania narzędzia do master-master replikacji - Bucardo, ale o tym narzędziu kiedy indziej.
Notyfikacje są wykorzystywane w 'powiadomieniach' o zmianach na tabelach UPDATE/INSERT/DELETE/TRUNCATE. Dzięki temu inne procesy, których zadaniem jest sprawdzanie i np. przenoszenie zmian, nie muszą wykonywać czasem bardzo obciążających zapytań. W zależności od pojawienia się nowych notyfikacji, sprawdzają, przenoszą lub dokonują innych zmian.

Przykład wywołania notyfikacji poprzez trigger:

test=# \sf bucardo.bucardo_triggerkick_mysync
CREATE OR REPLACE FUNCTION bucardo.bucardo_triggerkick_mysync()
 RETURNS trigger
 LANGUAGE plpgsql
 SECURITY DEFINER
AS $function$
BEGIN
 EXECUTE 'NOTIFY "bucardo_kick_sync_mysync"';
 IF TG_OP = 'TRUNCATE' THEN
   INSERT INTO bucardo.bucardo_truncate_trigger(tablename,sname,tname,sync)
   VALUES (TG_RELID, TG_TABLE_SCHEMA, TG_TABLE_NAME, 'mysync');
 END IF;
 RETURN NEW;
END;
$function$



Aby wysyłać i odpytywać o notyfikację używamy poleceń:
  • LISTEN channel_name - rejestrowanie bieżącej sesji do nasłuchiwanie na kanale o nazwie channel_name.
  • UNLISTEN { channel_name | * } - przestanie nasłuchiwania na wybrany kanale (channel_name) lub na wszystkie, na których sesja nasłuchiwała (*)
  • NOTIFY channel_name [, payload ] - wysyła notyfikację wraz z opcjonalnym parametrem playload, który jest stringiem o ograniczonej długości. Notyfikacje są widoczne dla wszystkich użytkowników, którzy nasłuchują.Dla standardowej konfiguracji payload musi być krótszy niż 8000 bajtów. Link do dokumentacji: http://www.postgresql.org/docs/current/static/sql-notify.html

Przykład prostego użycia w konsoli. Obydwa procesy nasłuchują na ten sam kanał. Pierwszy wysyła notyfikacja do wszystkich użytkowników:

root@test # notify test, 'cos tam';
NOTIFY
Time: 111,433 ms
Asynchronous notification "test" received from server process with PID 7031.

Drugi proces został poinformowany o notyfikacji:


root@test # select 1;
+----------+
| ?column? |
+----------+
|        1 |
+----------+
(1 row)

Time: 108,875 ms
Asynchronous notification "test" received from server process with PID 7031.



Cechy mechanizmu notyfikacji:
  • Wiele backend procesów nasłuchuje na jednej maszynie z możliwością nasłuchiwania na wielu kanałach.
  • Jest jedna centralna kolejka (FIFO) w głównym klastrze - katalog pg_notify, z odwzorowaniem aktywnie używanych stron w pamięci współdzielonej. Wszystkie notyfikacje są trzymane w tej kolejce i później odczytywane przez nasłuchujące backend procesy.
  • Nie ma centralnej informacji na temat kto nasłuchuje na którym kanale - każdy backend proces ma swoją listę nasłuchiwanych kanałów.
  • Notyfikacje są przypisywane do lokalnych baz danych czyli dwa backend procesy podłączone do różnych baz danych i nasłuchujące na kanałach o tych samych identyfikatorach nie będą się ze sobą komunikować - każda notyfikacja posiada OID bazy danych (ponieważ bazy mogą mieć różne kodowanie).
  • Notyfikacje nie zostaną zachowywane przy zamknięciu serwera (crash, restart).
  • Każdy backend proces, który nasłuchuje na przynajmniej jednym kanale, rejestruje swój PID w tablicy w AsyncQueueControl. Wtedy skanuje wszystkie przychodzące notyfikacje w centralnej kolejce i wpierw porównuje OID bazy danych swojego z OIDem bazy danych notyfikacji. Następnie porównuje kanał notyfikacji z kanałami na swojej liście nasłuchiwania. W ten sposób, gdy notyfikacja się zgodzi, jest zwracana na frontend procesu. Inne notyfikacje są pomijane. 
  • Polecenie NOTIFY przechowuje notyfikacje w liście backend procesu, które nie są procesowane do momentu gdy transakcja nie zostanie zakończona. Zduplikowane notyfikacje z tej samej transakcji są wysyłane jako jedna. Kiedy transakcja jest gotowa do zatwierdzenia, do głowy kolejki wysyłana jest oczekująca notyfikacja. Pozycja głowy kolejki wskazuje następną wolną pozycję i pozycja jest numerem strony i jej przesunięciem na stronie. Jest to wykonywane przed zatwierdzeniem transakcji w clog.
  • Ilość pamięci współdzielonej używanej przy załączaniu notyfikacjami może być zmienna (NUM_ASYNC_BUFFERS) i nie wypływa na nic po za wydajnością. Przy standardowej instalacji wielkość kolejki wynosi 8GB.


Przykład programu, który wysyła powiadomienia i drugiego, który nasłuchuje (napisane w Perlu):

cat send_notification.pl
#!/usr/bin/perl -w

use strict;
use warnings;
use DBI;

my $db_con = "dbi:Pg:dbname=test;host=127.0.0.1";
my $db_user = "root";
my $db_pass = "root";
my $db_attr = {RaiseError => 1, AutoCommit => 1};
my $queue_name = "test_listener";
my $iteration = 0;

my $db_handler = DBI->connect($db_con, $db_user, $db_pass, $db_attr);

$db_handler->do(qq{LISTEN $queue_name});

while (1) {
    $iteration += 1;
    my $message = qq{NOTIFY $queue_name, 'message number $iteration'};
    $db_handler->do($message);
    print "Sending: $message\n";
    sleep 1.5;
}


Skrypt send_notification.pl co 1,5 sekundy wysyła notyfikację z nowym numerem na kanał test_listener:

perl send_notification.pl
Sending: NOTIFY test_listener, 'message number 1'
Sending: NOTIFY test_listener, 'message number 2'
Sending: NOTIFY test_listener, 'message number 3'
Sending: NOTIFY test_listener, 'message number 4'
Sending: NOTIFY test_listener, 'message number 5'
Sending: NOTIFY test_listener, 'message number 6'


A skrypt check_notification.pl co 1 sekundę (domyślnie, jeśli przy wywołaniu nie poda się inaczej) sprawdza czy nowe notyfikacji są:

cat check_notification.pl
#!/usr/bin/perl -w

use strict;
use warnings;
use DBI;

my $db_con = "dbi:Pg:dbname=test;host=127.0.0.1";
my $db_user = "root";
my $db_pass = "root";
my $db_attr = {RaiseError => 1, AutoCommit => 1};
my $queue_name = "test_listener";
my $time = $ARGV[0] || 1;

my $db_handler = DBI->connect($db_con, $db_user, $db_pass, $db_attr);

$db_handler->do(qq{LISTEN $queue_name});

while (1) {
    my $notify = $db_handler->func("pg_notifies");
    if ($notify) {
        my ($name, $pid, $payload) = @$notify;
        print "Reasived: $name, $pid, $payload \n"
    }
    sleep $time;
}


Wynik:

perl check_notification.pl
Reasived: test_listener, 19334, message number 1 
Reasived: test_listener, 19334, message number 2 
Reasived: test_listener, 19334, message number 3 
Reasived: test_listener, 19334, message number 4 
Reasived: test_listener, 19334, message number 5 
Reasived: test_listener, 19334, message number 6

Skrypt do nasłuchiwania został uruchomiony przed wywołaniem skryptu do wysyłki notyfikacji, dlatego w wyniku widzimy wszystkie notyfikacje, które zostały wysłane. Jeśli uruchomilibyśmy go dopiero po skrypcie do wysyłki, tylko notyfikację wysłane po rejestracji procesu były by do niego dostarczone.

Brak komentarzy:

Prześlij komentarz