Notyfikacje są wykorzystywane w 'powiadomieniach' o zmianach na tabelach UPDATE/INSERT/DELETE/TRUNCATE. Dzięki temu inne procesy, których zadaniem jest sprawdzanie i np. przenoszenie zmian, nie muszą wykonywać czasem bardzo obciążających zapytań. W zależności od pojawienia się nowych notyfikacji, sprawdzają, przenoszą lub dokonują innych zmian.
Przykład wywołania notyfikacji poprzez trigger:
test=# \sf bucardo.bucardo_triggerkick_mysync CREATE OR REPLACE FUNCTION bucardo.bucardo_triggerkick_mysync() RETURNS trigger LANGUAGE plpgsql SECURITY DEFINER AS $function$ BEGIN EXECUTE 'NOTIFY "bucardo_kick_sync_mysync"'; IF TG_OP = 'TRUNCATE' THEN INSERT INTO bucardo.bucardo_truncate_trigger(tablename,sname,tname,sync) VALUES (TG_RELID, TG_TABLE_SCHEMA, TG_TABLE_NAME, 'mysync'); END IF; RETURN NEW; END; $function$
Aby wysyłać i odpytywać o notyfikację używamy poleceń:
- LISTEN channel_name - rejestrowanie bieżącej sesji do nasłuchiwanie na kanale o nazwie channel_name.
- UNLISTEN { channel_name | * } - przestanie nasłuchiwania na wybrany kanale (channel_name) lub na wszystkie, na których sesja nasłuchiwała (*)
- NOTIFY channel_name [, payload ] - wysyła notyfikację wraz z opcjonalnym parametrem playload, który jest stringiem o ograniczonej długości. Notyfikacje są widoczne dla wszystkich użytkowników, którzy nasłuchują.Dla standardowej konfiguracji payload musi być krótszy niż 8000 bajtów. Link do dokumentacji: http://www.postgresql.org/docs/current/static/sql-notify.html
Przykład prostego użycia w konsoli. Obydwa procesy nasłuchują na ten sam kanał. Pierwszy wysyła notyfikacja do wszystkich użytkowników:
root@test # notify test, 'cos tam'; NOTIFY Time: 111,433 ms Asynchronous notification "test" received from server process with PID 7031.Drugi proces został poinformowany o notyfikacji:
root@test # select 1; +----------+ | ?column? | +----------+ | 1 | +----------+ (1 row) Time: 108,875 ms Asynchronous notification "test" received from server process with PID 7031.
Cechy mechanizmu notyfikacji:
- Wiele backend procesów nasłuchuje na jednej maszynie z możliwością nasłuchiwania na wielu kanałach.
- Jest jedna centralna kolejka (FIFO) w głównym klastrze - katalog pg_notify, z odwzorowaniem aktywnie używanych stron w pamięci współdzielonej. Wszystkie notyfikacje są trzymane w tej kolejce i później odczytywane przez nasłuchujące backend procesy.
- Nie ma centralnej informacji na temat kto nasłuchuje na którym kanale - każdy backend proces ma swoją listę nasłuchiwanych kanałów.
- Notyfikacje są przypisywane do lokalnych baz danych czyli dwa backend procesy podłączone do różnych baz danych i nasłuchujące na kanałach o tych samych identyfikatorach nie będą się ze sobą komunikować - każda notyfikacja posiada OID bazy danych (ponieważ bazy mogą mieć różne kodowanie).
- Notyfikacje nie zostaną zachowywane przy zamknięciu serwera (crash, restart).
- Każdy backend proces, który nasłuchuje na przynajmniej jednym kanale, rejestruje swój PID w tablicy w AsyncQueueControl. Wtedy skanuje wszystkie przychodzące notyfikacje w centralnej kolejce i wpierw porównuje OID bazy danych swojego z OIDem bazy danych notyfikacji. Następnie porównuje kanał notyfikacji z kanałami na swojej liście nasłuchiwania. W ten sposób, gdy notyfikacja się zgodzi, jest zwracana na frontend procesu. Inne notyfikacje są pomijane.
- Polecenie NOTIFY przechowuje notyfikacje w liście backend procesu, które nie są procesowane do momentu gdy transakcja nie zostanie zakończona. Zduplikowane notyfikacje z tej samej transakcji są wysyłane jako jedna. Kiedy transakcja jest gotowa do zatwierdzenia, do głowy kolejki wysyłana jest oczekująca notyfikacja. Pozycja głowy kolejki wskazuje następną wolną pozycję i pozycja jest numerem strony i jej przesunięciem na stronie. Jest to wykonywane przed zatwierdzeniem transakcji w clog.
- Ilość pamięci współdzielonej używanej przy załączaniu notyfikacjami może być zmienna (NUM_ASYNC_BUFFERS) i nie wypływa na nic po za wydajnością. Przy standardowej instalacji wielkość kolejki wynosi 8GB.
Przykład programu, który wysyła powiadomienia i drugiego, który nasłuchuje (napisane w Perlu):
cat send_notification.pl #!/usr/bin/perl -w use strict; use warnings; use DBI; my $db_con = "dbi:Pg:dbname=test;host=127.0.0.1"; my $db_user = "root"; my $db_pass = "root"; my $db_attr = {RaiseError => 1, AutoCommit => 1}; my $queue_name = "test_listener"; my $iteration = 0; my $db_handler = DBI->connect($db_con, $db_user, $db_pass, $db_attr); $db_handler->do(qq{LISTEN $queue_name}); while (1) { $iteration += 1; my $message = qq{NOTIFY $queue_name, 'message number $iteration'}; $db_handler->do($message); print "Sending: $message\n"; sleep 1.5; }
Skrypt send_notification.pl co 1,5 sekundy wysyła notyfikację z nowym numerem na kanał test_listener:
perl send_notification.pl Sending: NOTIFY test_listener, 'message number 1' Sending: NOTIFY test_listener, 'message number 2' Sending: NOTIFY test_listener, 'message number 3' Sending: NOTIFY test_listener, 'message number 4' Sending: NOTIFY test_listener, 'message number 5' Sending: NOTIFY test_listener, 'message number 6'
A skrypt check_notification.pl co 1 sekundę (domyślnie, jeśli przy wywołaniu nie poda się inaczej) sprawdza czy nowe notyfikacji są:
cat check_notification.pl #!/usr/bin/perl -w use strict; use warnings; use DBI; my $db_con = "dbi:Pg:dbname=test;host=127.0.0.1"; my $db_user = "root"; my $db_pass = "root"; my $db_attr = {RaiseError => 1, AutoCommit => 1}; my $queue_name = "test_listener"; my $time = $ARGV[0] || 1; my $db_handler = DBI->connect($db_con, $db_user, $db_pass, $db_attr); $db_handler->do(qq{LISTEN $queue_name}); while (1) { my $notify = $db_handler->func("pg_notifies"); if ($notify) { my ($name, $pid, $payload) = @$notify; print "Reasived: $name, $pid, $payload \n" } sleep $time; }
Wynik:
perl check_notification.pl Reasived: test_listener, 19334, message number 1 Reasived: test_listener, 19334, message number 2 Reasived: test_listener, 19334, message number 3 Reasived: test_listener, 19334, message number 4 Reasived: test_listener, 19334, message number 5 Reasived: test_listener, 19334, message number 6
Skrypt do nasłuchiwania został uruchomiony przed wywołaniem skryptu do wysyłki notyfikacji, dlatego w wyniku widzimy wszystkie notyfikacje, które zostały wysłane. Jeśli uruchomilibyśmy go dopiero po skrypcie do wysyłki, tylko notyfikację wysłane po rejestracji procesu były by do niego dostarczone.
Brak komentarzy:
Prześlij komentarz