JL Computer Consultancy
A mechanism for controlling concurrent (similar) processes |
May 1999 |
A recent article described a way of emulating some aspects of Oracle Parallel Query engine for processing very large tables in calculated chunks bounded by rowid ranges. The main purpose of that article was simply to split the table up so that each chunk could be updated independently with minimum contention and wasted I/O.
This article describes a general-purpose mechanism that can be used to allow multiple concurrent processes to run through a list of (similar) tasks grabbing one task at a time, processing, and then going back for the next available task until all the tasks are complete. In a follow-up article I will show how to combine the table-splitting tools with the concurrency package to produce a simple program that will:
break up a table into a collection of chunks then复制
kick off an arbitrary number of concurrent processes to update randomly selected chunks复制
By combining the two mechanisms in this way you can set up a very simple framework for updating very large tables (or doing other large jobs) very rapidly with minimum contention whilst taking advantage of all the resources available to you.
The Strategy:
The basic idea is very simple: we define a table which identifies a list of tasks, and holds a flag identifying the state of that task. The states I cater for are: 'Awaiting processing (or New)', 'Being processed (or Active)', 'Completed', 'Error'. The procedures we need to create are essentially processes which grab a row in one state, change its state, and commit.
The only important thing about implementing these procedures is to ensure that cater for concurrent usage; this is quite easy to manage by ensuring that all attempts to acquire a row from the table are done using 'select for update nowait' with a trap for 'resource busy (Oracle error 54)' and then issuing a commit as soon as they have done a state change. The following extract is the most significant section of code and demonstrates the idea:
The presence of the commit is probably the most important feature to consider when using this package - it is deliberate and in my opinion necessary to ensure that the mechanism cannot be oncorrectly used to produce contention, long queues and dead-locks and accidetal re-processing - however it is important to remember (for example) that if the code you write to do a particular task fails, then you have to do a rollback for your changes before you call the package to mark the task as Errored.
Code Sample
procedure allocate_target_item(复制
i_driving_task in varchar2,复制
o_payload out varchar2,复制
io_return_code in out number,复制
io_error_message in out varchar2复制
) is复制
m_rowid rowid;复制
m_payload varchar2(40);复制
m_loop number;复制
begin复制
io_return_code := c_success;复制
io_error_message := null;复制
o_payload := null;复制
commit;复制
for m_loop in 1..5 loop复制
begin复制
select rowid, payload复制
into m_rowid, m_payload复制
from parallel_allocation_list复制
where driving_task = i_driving_task复制
and status = c_awaiting_processing复制
and rownum = 1复制
for update of status nowait;复制
exception复制
when no_data_found then复制
io_return_code := c_no_rows_left;复制
io_error_message := sqlerrm;复制
return;复制
when nowait_failed then复制
if m_loop = 5 then复制
io_return_code := c_row_locked;复制
io_error_message := sqlerrm;复制
return;复制
else复制
sys.dbms_lock.sleep(1);复制
end if;复制
when others then复制
io_return_code := c_general_error;复制
io_error_message := sqlerrm;复制
return;复制
end;复制
end loop;复制
update parallel_allocation_list复制
set status = c_being_processed复制
where rowid = m_rowid;复制
o_payload := m_payload;复制
commit;复制
end allocate_target_item;复制
The code was written on Oracle 7.2, so there are a few changes that could be made to improve the efficiency and reduce the volume of code. The most significant change I plan to make, though, is in the mechanism of the driving table - in theory each class of task could have its own table with its own payload type, in the short term I created a static table and added an extra column to it to identify the class of task (hence class of concurrent processes) that a row was relevant to.
One little warning - as you can see the package does a back-off if the row it tried to grab was locked. To do this it calls the dbms_lock.sleep procedure that is usually owned by SYS. The id that owns my code has to have the privilege to execute this package granted to it.
Apart from the obvious 'grab a row', the package as a whole contains a total of 10 procedures:
create_driver |
Create (logically, but not actually) a driving table for a new class of tasks |
drop_driver |
Drop (logically) the driving table for a class of tasks |
populate_driver |
Insert one task definition (a.k.a. payload) into the driving table |
allocate_target_item |
Find a 'New' task and change it to 'Active', returning the payload to the caller |
complete_target_item |
Change an 'Active' task to 'Complete' |
error_target_item |
Change an 'Active task to 'Errored' |
free_target_item |
Set an 'Active' task back to 'New' - to cater for doing one bit again |
reset_all_targets |
Change all 'Complete' tasks back to 'New' - to cater for a total batch restart |
clear_error_for_target_item |
Change an 'Errored' task back to 'New' - after fixing a problem |
clear_all_errors |
Change all 'Errored' tasks back to 'New' - after fixing a big problem. |
Between them, these 10 procedures should allow you to create simple, but robust, batch processes to handle arbitrarily large groups of similar tasks with an arbitrary degree of concurrency. (My first use was for a batch run to turn 104 tables of weekly detail into 104 tables of summaries each Saturday).
There are three pieces of code to collect:
create the driving table复制
create the package header复制
create the package body复制