Drop Down MenusCSS Drop Down MenuPure CSS Dropdown Menu

PICK A TASK TO WORK ON

There are cases where system stores list of things to do, and then there are some worker processes that check the list, pick something to work on, do it, and remove from the list.
Proper solution is to use some kind of queuing system. There is even PgQ which works withing PostgreSQL, but some people are not happy with it, as it requires compilation and installation. So they just use plain selects.
Will that work OK?
Let's imagine, that our queue contains 10 thousand strings, and we want to calculate MD5 checksums of them.
Since I want my test to be at least a bit realistic, I will also need priority. Some tasks are more important than others. Let me assume that priority is based on number of repetitions of first character. So string “ddwhatever" should be processed before “abc" (because of “d" repetition).
Additionally, I will need some timestamp – to simulate the fact that some rows have been inserted earlier, and some later. For now, it will be pretty simple – I will choose some arbitrary timestamp, and then increment with 1 second for every line.
Since I will be redoing the test multiple times, I'll pre-generate list of strings, with simple python:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import string
import random
import time
 
 
def line_generator(use_time):
    size = 50
    chars = string.letters + string.digits
    random_string = ''.join(random.choice(chars) for x in range(size))
    priority = 49 - len(random_string.lstrip(random_string[0]))
    timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(use_time))
    return '%s\t%s\t%s\n' % (random_string, str(priority), timestamp)
 
start_time = time.time() - 100000000
f = file('/tmp/md5_data.txt', 'w')
for i in xrange(1, 10001):
    f.write(line_generator(start_time))
    start_time += 1
f.close()
The data looks like this:
1hEBJdCz8vpbiBv3oQWZzksmPyROyFRnqCHcxaQIOnf9Bt69DX 0 2010-06-30 03:21:15
3UeHKPjmzKhPOptZtE59XYpAbNcuryUhDW6lrqtNwoGL53kpg6 0 2010-06-30 03:21:16
VTboIg1uaYirybCDFwgBglOkdAV9QD20cafPkQso9vLsggU0WQ 0 2010-06-30 03:21:17
lC2R2dWlCQlEvhNuS991mnsmATeXlAwxvCE3lqmr64J4Eumd81 0 2010-06-30 03:21:18
PPuMHqGcQfxMlPJMBYlYXI4DwMWYqiuyjTeQCobBiDpTQp9kAv 1 2010-06-30 03:21:19
hMJYfwpgu29rR2fTAeGW5cIArEoQdI9kgzXYts4Ca294bCV96H 0 2010-06-30 03:21:20
oepH8Tq4ZrnbI957fnK1ElI6cEuIVZHVicUeHDtVB1dSUKu0iK 0 2010-06-30 03:21:21
gEsvbKdW27jUecvE8mPpwKfs7CMuP2GRxEbTPb8cUz4udIpz3q 0 2010-06-30 03:21:22
EbHEyj6WfV9YxLfWD5UBYiXvFfnHY2aOpX1YqOHQhyyMpNWjWR 0 2010-06-30 03:21:23
9wwIbPy3y1Ec2TlwgQPOXZQCrDzEnJfPZoAciZ2YsXOyMh7x73 0 2010-06-30 03:21:24
So, now, let's make the queue table:
create table queue (
    job text,
    priority int4,
    added_on timestamptz
);
\COPY queue from /tmp/md5_data.txt
With this in place, I'll add the index that will let me quickly find rows to process:
create index queue_idx on queue (priority desc, added_on asc);
OK. So the basic query to get 1st row to process would be:
select * from queue order by priority desc, added_on asc limit 1;
                        job                         | priority |        added_on        
----------------------------------------------------+----------+------------------------
 66664o28k4haYpPdiLRB7uvh17kYPZA9zg2WIiYv2ka6TxqYAj |        3 | 2010-06-30 12:14:46+02
(1 row)
 
explain analyze
select * from queue order by priority desc, added_on asc limit 1;
                                                          QUERY PLAN                                                          
------------------------------------------------------------------------------------------------------------------------------
 Limit  (cost=0.29..0.36 rows=1 width=44) (actual time=0.036..0.036 rows=1 loops=1)
   ->  Index Scan using queue_idx on queue  (cost=0.29..770.28 rows=10000 width=44) (actual time=0.035..0.035 rows=1 loops=1)
 Total runtime: 0.079 ms
(3 rows)
OK. it works, and is clearly indexes. So we're good.
Now. The first solution to the queue processing, is to write simple script that will read the data from queue, do the work on every row, and delete it from queue after the work is done.
Like this one:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import psycopg2
import hashlib
import sys
 
 
def process_item(row):
    h = hashlib.md5()
    h.update(row[0])
    return h.hexdigest()
 
conn = psycopg2.connect("port=5930 host=127.0.0.1 user=depesz dbname=depesz")
cur = conn.cursor()
processed = 0
while True:
    cur.execute('select * from queue order by priority desc, added_on asc limit 1')
    row = cur.fetchone()
    if row is None:
        break
    ignore = process_item(row)
    processed = processed + 1
    cur.execute('delete from queue where priority = %s and added_on = %s', (row[1], row[2]))
    conn.commit()
 
cur.close()
conn.close()
print "processed %d rows." % (processed,)
I ran the script three times. Each of course processed 10k rows (between runs I refilled the table, with the same data). Average time was 63.822 seconds.
Now, we can parallelize it. What will happen if I'd run 2 processing scripts? Let's see:
=$ ./processor-1.py & ./processor-1.py & time wait
[1] 14231
[2] 14232
processed 10000 rows.
processed 10000 rows.
[1]-  Done                    ./processor-1.py
 
real    1m4.833s
user    0m4.304s
sys     0m2.940s
Whoa. This looks bad. Total time is more or less the same, but both of the scripts did process 10k rows? So each row was processed twice. That's not good.
The reason is that while I do “select" – the row is fully available to the other processor.
So maybe I'll add there “select for update"? Make the cur.execute line:
    cur.execute('select * from queue order by priority desc, added_on asc limit 1 for update')
Result:
=$ ./processor-1.py & ./processor-1.py & time wait
[1] 14409
[2] 14410
processed 5000 rows.
processed 5000 rows.
[1]-  Done                    ./processor-1.py
[2]+  Done                    ./processor-1.py
 
real    1m3.487s
user    0m2.336s
sys     0m1.792s
Rows were splitted now correctly, but the total time didn't decrease. This is because when first processor does the select for update, it locks the row and second processor has to wait. It can't skip the row, so it has to wait.
There is “FOR UPDATE NOWAIT" version of the query, but the problem is that it is raising exception, which forces application to rollback, and retry. Not good.
Of course, I could add some random offset, so that usually not the first row would be selected, and chances of interlocking would be small, but then – it doesn't sound as sane solution – we want the rows to be processed in order.
Now. Another solution would be to add column: “in_process" which would be updated to true, change would be committed. And then, after processing is done – we can remove the row.
This would work, but it has one huge disadvantage. What will happen if the processing application dies without removing the row and without changing back the “in_process" flag?
The row would stay forever as “in_process".
Of course – we could add some maintenance, and make “in_process" not boolean but timestamp. In this way we could assume that if something is in process for too long (5 minutes?) we assume it has to be redone.
Not really cool. But there is another approach. Let's store not timestamp, but integer – the integer would mark which backend is processing the row. If the backend is no longer there – it has to be redone.
Let's modify the table:
ALTER TABLE queue add column processed_by INT4;
Now, the script that processes rows is:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import psycopg2
import hashlib
import sys
 
 
def process_item(row):
    h = hashlib.md5()
    h.update(row[0])
    return h.hexdigest()
 
conn = psycopg2.connect("port=5930 host=127.0.0.1 user=depesz dbname=depesz")
cur = conn.cursor()
processed = 0
while True:
    cur.execute('''
        select *
        from queue
        where processed_by is null
            or not exists (select * from pg_stat_get_activity( processed_by ) )
        order by priority desc, added_on asc limit 1 for update
        ''')
    row = cur.fetchone()
    if row is None:
        break
    cur.execute('update queue set processed_by = pg_backend_pid() where priority = %s and added_on = %s', (row[1], row[2]))
    conn.commit()
 
    ignore = process_item(row)
    processed = processed + 1
 
    cur.execute('delete from queue where priority = %s and added_on = %s and processed_by = pg_backend_pid()', (row[1], row[2]))
    conn.commit()
 
cur.close()
conn.close()
print "processed %d rows." % (processed,)
Test run:
=$ ./processor-2.py & ./processor-2.py & time wait
[1] 16959
[2] 16960
processed 5000 rows.
[1]-  Done                    ./processor-2.py
processed 5000 rows.
[2]+  Done                    ./processor-2.py
 
real    2m6.649s
user    0m3.772s
sys     0m3.564s
What?
I checked, the Pg logs. Apparently summarized times of all queries previously (select for update/delete) is ~ 114 seconds. Now – the time skyrocketed to 240s.
First thing – what will happen with more backends?
=$ for i in 1 2 3 4; do ./processor-2.py & done; time wait
[1] 17346
[2] 17347
[3] 17348
[4] 17349
processed 2087 rows.
processed 2676 rows.
processed 2859 rows.
processed 2378 rows.
[1]   Done                    ./processor-2.py
[3]-  Done                    ./processor-2.py
[4]+  Done                    ./processor-2.py
[2]+  Done                    ./processor-2.py
 
real    2m3.399s
user    0m3.064s
sys     0m2.444s
Why is it so slow? I looked at the pg query log the time try with two processors, and found out that 167 second of the time (out of total 268) was taken by COMMIT. 39 seconds by “select for update", and 31 by “delete".
My guess is that doing the update and commit we are simply forcing more IO (at the very least the xlog has to be fsynced).
I could have tested with unlogged tables, but I think that the queue table should be logged – so that it wouldn't vanish in case of sudden system restart.
So – it looks that we're stuck – processing in parallel takes the same time due to locks, and if we'll add information about lock that can be bypassed by other processors – it's even slower due to more writes.
Or are we?
I wrote couple of times about magnificent thing called advisory locks. Let's see if they can help us now.
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import psycopg2
import hashlib
import sys
 
 
def process_item(row):
    h = hashlib.md5()
    h.update(row[0])
    return h.hexdigest()
 
conn = psycopg2.connect("port=5930 host=127.0.0.1 user=depesz dbname=depesz")
cur = conn.cursor()
processed = 0
while True:
    cur.execute('''
    select *
    from queue
    where pg_try_advisory_xact_lock(123, hashtext( priority::text || added_on::text ) )
    order by priority desc, added_on asc limit 1 for update
    ''')
    row = cur.fetchone()
    if row is None:
        break
    ignore = process_item(row)
    processed = processed + 1
    cur.execute('delete from queue where priority = %s and added_on = %s', (row[1], row[2]))
    conn.commit()
 
cur.close()
conn.close()
print "processed %d rows." % (processed,)
First test:
=$ for i in 1 2; do ./processor-3.py & done; time wait
[1] 20100
[2] 20101
processed 5000 rows.
[1]-  Done                    ./processor-3.py
processed 5000 rows.
[2]+  Done                    ./processor-3.py
 
real    1m2.254s
user    0m2.656s
sys     0m2.144s
It's actually not bad. A bit faster than serial processing. And what if I'd add more processors?
=$ for i in 1 2 3 4; do ./processor-3.py & done; time wait
[1] 20203
[2] 20204
[3] 20205
[4] 20206
processed 2500 rows.
processed 2499 rows.
processed 2501 rows.
processed 2500 rows.
[1]   Done                    ./processor-3.py
[2]   Done                    ./processor-3.py
[4]+  Done                    ./processor-3.py
 
real    0m30.911s
user    0m1.936s
sys     0m1.508s
Now this is nice. Please note that I got uneven distribution – one of the processors did 2499, two 2500 and one 2501. Which is great.
Finally, let's up the count one more time, to see how it will work out:
=$ for i in {1..8}; do ./processor-3.py & done; time wait
...
processed 1249 rows.
processed 1250 rows.
processed 1250 rows.
processed 1250 rows.
processed 1250 rows.
processed 1250 rows.
processed 1251 rows.
processed 1250 rows.
...
real    0m14.704s
user    0m2.148s
sys     0m1.112s
It's all good.
Now – if you can, please do use proper solution (pgq or something other having “mq" in the name).
But if you can't – you can actually get pretty good results with some advisory locks and simple sql queries. Without even plpgsql.

Comments

Popular posts from this blog

Oracle DBMS SCHEDULER Examples

How to find the server is whether standby (slave) or primary(master) in Postgresql replication ?

7 Steps to configure BDR replication in postgresql

How to Get Table Size, Database Size, Indexes Size, schema Size, Tablespace Size, column Size in PostgreSQL Database

How to Enable/Disable autovacuum on PostgreSQL