Kolejkowanie zadań - cron, postgresql, aplikacja webowa

Sytuacja: Jest skrypt odpalany regularnie z crona. Skrypt robi "coś", dla utrudnienia powiedzmy, że w kilku rodzajach, a wyniki wrzuca do bazy danych (w tym przypadku: postgresql'a). Dane z bazy są prezentowane w aplikacji webowej. Skrypt działa dość długo, powiedzmy, że około godziny. Jednocześnie powinna działać tylko jedna instancja, ale ponieważ jest kilka rodzajów "czegoś", to w zależności od rodzaju, zadanie musi być uruchamiane z różnym interwałem, może być też jednorazowe.

Problem: Skrypt może być wywoływany "według planu" - regularnie, albo "on demand" - użytkownik aplikacji stwierdza, że skrypt ma wykonać "teraz zaraz" jakiś rodzaj operacji. Jak więc zarządzać momentem wywołania skryptu z crona i aplikacji jednocześnie?

Rozwiązanie: Do zarządzania momentem wykonania skryptu wykorzystamy pośrednika: bazę danych. Utworzymy dwie tabele: jedną do trzymania rodzajów zadań (z określeniem co jaki interwał zadanie ma być wykonywane) oraz tabelę, w której będą zapisane timestampy wykonania i status wykonania. Do blokowania wykonania wykorzystamy mechanizm advisory lock PostgreSQL'a. (http://www.postgresql.org/docs/8.3/interactive/functions-admin.html).

Advisory lock pozwala ustawić flagę, oznaczoną dowolną liczbą całkowitą, widoczną w całej bazie. Ustawiona flaga jest przypisana do połączenia, i jeśli nie zostanie zdjęta w trakcie połączenia, to po zakończeniu baza automatycznie zdejmie flagę. Do ustawienia advisory locka można użyć funkcji blokującej pg_advisory_lock(int) (jeśli flaga jest ustawiona w innej sesji, to funkcja zakończy działanie dopiero w momencie zdjęcia flagi), albo nieblokującej pg_try_advisory_lock(int) (funkcja zwróci false, jeśli flaga jest już zajęta). Advisory lock jest więc bardzo przydatny (i szybszy niż działanie na tabeli) do synchronizowania wielu procesów jednocześnie.
Dodatkowo możemy rozróżnić typ zadań (regularne, na życzenie) za pomocą dodatkowej kolumny. W przypadku zadań regularnych, zadanie powinno się wykonać regularnie

Tabele:

create table scheduler_type(
sht_id serial primary key, -- identyfikator typu
sht_name text not null, -- nazwa typu
sht_repeat_interval interval, -- wielkość interwału między uruchomieniami
sht_lock int unique -- id advisory locka. jeśli null, to nie blokujemy
);


create table scheduler (
shd_id serial primary key, -- identyfikator zadania
shd_created timestamptz not null default current_timestamp, -- utworzenie zadania
shd_scheduled timestamptz not null default current_timestamp, -- czas, w którym powinno zostać wykonane zadanie
shd_start timestamptz, -- start zadania
shd_finish timestamptz, -- koniec wykonania zadania
shd_sht_id int references scheduler_type(sht_id), -- typ zadania
shd_repeat boolean not null default true, -- flaga, czy jeśli typ zadania ma ustawiony interwał, to czy ma się odnawiać
shd_result text -- rezultat uruchomienia taska
);

mały widok..


create or replace view v_scheduler as
select t.*, s.*
from scheduler_type t, scheduler r
where t.sht_id = r.shd_sht_id;

oraz funkcje pomocnicze:

dodawanie zadania do kolejki (zwraca true, jeśli zadanie zostało wstawione)


create or replace function schedule_job(start timestamptz, job_type text, job_repeat boolean) returns boolean as
$$
declare
sched v_scheduler%ROWTYPE;
nstart timestamptz;
begin
-- nie pozwalamy wstawic jobsow w przeszlosci. jesli timestamp jest starszy, to wychodzimy
if nstart < current_timestamp then
return false;
end if;
-- nie można dodać, jeśli jest już task na przyszłość, oraz jeśli typ roboty jest taki sam
select into sched * from v_scheduler where shd_executed is null and sht_name = job_type;
if found then
return false;
end if;
insert into schedule ( shd_scheduled, shd_type, shd_repeat) values (nstart, job_type, job_repeat);
return true;
end;
$$ language plpgsql;

sprawdzanie, czy jest zadanie (zwraca id wiersza z zadaniem)


create or replace function schedule_start() returns int as
$$
declare
sched v_scheduler%ROWTYPE;
adv_lock int;
begin
-- czy lock trzymany przez inną sesję?
if not pg_try_advisory_lock(adv_lock) then
return null;
end if;
-- szukamy, czy są jakieś roboty do roboty
select into sched * from v_scheduler where shd_start is null and shd_scheduled <=current_timestamp order by shd_scheduled limit 1;
-- nie znaleźliśmy żadnych. wychodzimy
if not found then
return null;
end if;
-- jeśli zadanie wymaga lockowania, to sprawdzamy, czy lock jest zajęty
if sched.sht_lock is not null then
if not pg_try_advisory_lock(adv_lock) then
return null;
end if;
end if;
-- uaktualniamy
update schedule set shd_start = current_timestamp where shd_id = sched.shd_id;
-- jeśli wykonywana robota jest powtarzalna, dodajemy następne wywołanie. tym unikamy potrzeby 'zaludnienia' tabeli przez zadania powtarzalne na okres czasu wprzód.
if sched.sht_repeat_interval is not null and sched.shd_repeat then
perform schedule_job(sched.shd_scheduled + sched.sht_repeat_interval, sched.sht_name) ;
end if;
return sched.shd_id;
end;
$$ language plpgsql;


create or replace function schedule_end(schedule_id int, result text ) returns int as
$$
declare
sched v_scheduler%ROWTYPE;
begin
select into sched * from v_scheduler where shd_id = schedule_id;
-- uaktualniamy
update schedule set shd_finished = current_timestamp, shd_result = result where shd_id = schedule_id and shd_finished is null;
if sched.sht_lock is not null then
perform pg_advisory_unlock(sched.sht_lock);
end if;
return sched.shd_id;
end;
$$ language plpgsql;

W skrypcie wykorzystamy pomocniczą klasę Checker, która wymaga osobnego połączenia z bazą.


class Checker(object):
  def __init__(self, connection):
   self.connection = connection
  def check(self):
   cursor = self.connection.cursor()
   cursor.execute('select * from check_schedule()')
   out = cursor.fetchone()[0]
   self.schedule_id = out
   return out
  def release(self, result):
   cursor = self.connection.cursor()
   cursor.execute('select * from finish_schedule(%s, %s)', (self.schedule_id, result,))
   out = cursor.fetchone()[0]
   return out

Potem tylko sprawdzamy na początku skryptu:


checker = Checker(connection)
if checker.check():
...tu wykonujemy resztę skryptu i przekazujemy wynik do zmiennej result..
checker.release(result)

Jak to działa?

- Najpierw tworzymy typy zadań wstawiając wiersz do scheduler_type. Określamy unikalną nazwę, ewentualną częstotliwość wykonywania i określamy id dla advisory lock'a, jeśli chcemy, żeby wykonywane było tylko jedno zadanie na raz. Id może być dowolną liczbą całkowitą. Ważne, żeby nie pokrywało się z innymy lockami.

- Aby skrypt zaczął wykonywać zadania, trzeba ustawić zadania startowe:
select schedule_job(current_timestamp, nazwa_zadania, true).

- W cronie ustawiamy częste wykonywanie skryptu (np. co minutę). Za każdym razem skrypt będzie na początku łączył się z bazą, wykonywał check_schedule().

- Funkcja sprawdza, czy jest ustawiony przyjęty advisory lock. Jeśli jest, to kończy działanie i zwraca false. Potem sprawdza, czy są zaplanowane jakieś zadania. Jeśli nie, kończy false'm. Jeśli są zadania, bierze pierwsze w kolejce, uaktualnia wiersz zadania ustawiając timestamp w polu shd_start i zwraca id wiersza. Funkcja kończy działanie. Jeśli ustawiony został advisory lock, to następne wywołania skryptu z crona zakończą się bez odpalania własciwego kodu skryptu.

- Jeśli baza nie zwróciła false, to skrypt zaczyna właściwe działanie i działa, działa, działa... Na zakończenie obiekt Checker zapisuje w bazie rezultat działania i timestamp zakończenia działania. Z zakończeniem działania skryptu kończone jest połączenie i ewentualnie zdejmowany jest lock.

- Jeśli użytkownik aplikacji stwierdzi, że chce wykonać skrypt "ręcznie", to dodaje zadanie ad hoc: select schedule_job(current_timestamp, nazwa_zadania, false).

Cykl życia wygląda mniej więcej tak:

cron: |--CS--CN--CN--CN-F-CN--CN--CU--CN--CN--FCN-CN--CN-CS-CN--CN-F-CN|
baza: |-I-JnllllllllllllF----------U--JullllllllF------- JnllllllllF---|

C - Wykonanie zadania w cronie
N - Nic do roboty. Jest ustawiony lock, albo nie ma zadania w tabeli.
S - start zadania
I - dodanie zadania cyklicznego
U - dodanie zadania usera
Jn - start cyklicznego zadania w bazie - lock, dodanie następnego zadania cykliczneg
Ju - start zadania usera w bazie - lock, bez dodania kolejnego zadania
l - ustawiony advisory lock
F - koniec zadania, zapisanie timestampa, rezultatu, zdjęcie flagi.