暂无图片
暂无图片
暂无图片
暂无图片
暂无图片

Concurrency

2011-01-01
473

JL Computer Consultancy

A mechanism for controlling concurrent (similar) processes

May 1999


A recent article described a way of emulating some aspects of Oracle Parallel Query engine for processing very large tables in calculated chunks bounded by rowid ranges. The main purpose of that article was simply to split the table up so that each chunk could be updated independently with minimum contention and wasted I/O.

This article describes a general-purpose mechanism that can be used to allow multiple concurrent processes to run through a list of (similar) tasks grabbing one task at a time, processing, and then going back for the next available task until all the tasks are complete. In a follow-up article I will show how to combine the table-splitting tools with the concurrency package to produce a simple program that will:

        break up a table into a collection of chunks then
复制
        kick off an arbitrary number of concurrent processes to update randomly selected chunks
复制

By combining the two mechanisms in this way you can set up a very simple framework for updating very large tables (or doing other large jobs) very rapidly with minimum contention whilst taking advantage of all the resources available to you.

The Strategy:

The basic idea is very simple: we define a table which identifies a list of tasks, and holds a flag identifying the state of that task. The states I cater for are: 'Awaiting processing (or New)', 'Being processed (or Active)', 'Completed', 'Error'. The procedures we need to create are essentially processes which grab a row in one state, change its state, and commit.

The only important thing about implementing these procedures is to ensure that cater for concurrent usage; this is quite easy to manage by ensuring that all attempts to acquire a row from the table are done using 'select for update nowait' with a trap for 'resource busy (Oracle error 54)' and then issuing a commit as soon as they have done a state change. The following extract is the most significant section of code and demonstrates the idea:

The presence of the commit is probably the most important feature to consider when using this package - it is deliberate and in my opinion necessary to ensure that the mechanism cannot be oncorrectly used to produce contention, long queues and dead-locks and accidetal re-processing - however it is important to remember (for example) that if the code you write to do a particular task fails, then you have to do a rollback for your changes before you call the package to mark the task as Errored.


Code Sample

procedure allocate_target_item(
复制
               i_driving_task         in      varchar2,
复制
               o_payload                 out  varchar2,
复制
               io_return_code         in out  number,
复制
               io_error_message       in out  varchar2
复制
) is
复制
m_rowid        rowid;
复制
m_payload      varchar2(40);
复制
m_loop         number;
复制
begin
复制
        io_return_code := c_success;
复制
        io_error_message := null;
复制
        o_payload := null;
复制
        commit;
复制
        for m_loop in 1..5 loop
复制
        begin
复制
               select  rowid, payload
复制
               into    m_rowid, m_payload
复制
               from    parallel_allocation_list
复制
               where   driving_task = i_driving_task
复制
               and     status = c_awaiting_processing
复制
               and     rownum = 1
复制
               for update of status nowait;
复制
        exception
复制
               when no_data_found then
复制
                       io_return_code := c_no_rows_left;
复制
                       io_error_message := sqlerrm;
复制
                       return;
复制
               when nowait_failed then 
复制
                       if m_loop = 5 then
复制
                               io_return_code := c_row_locked;
复制
                               io_error_message := sqlerrm;
复制
                               return;
复制
                       else
复制
                               sys.dbms_lock.sleep(1);
复制
                       end if;
复制
               when others then
复制
                       io_return_code := c_general_error;
复制
                       io_error_message := sqlerrm;
复制
                       return;
复制
        end;
复制
        end loop;
复制
        update parallel_allocation_list 
复制
        set status = c_being_processed
复制
        where rowid = m_rowid;
复制
        o_payload := m_payload;
复制
        commit;
复制
end allocate_target_item;
复制


The code was written on Oracle 7.2, so there are a few changes that could be made to improve the efficiency and reduce the volume of code. The most significant change I plan to make, though, is in the mechanism of the driving table - in theory each class of task could have its own table with its own payload type, in the short term I created a static table and added an extra column to it to identify the class of task (hence class of concurrent processes) that a row was relevant to.

One little warning - as you can see the package does a back-off if the row it tried to grab was locked. To do this it calls the dbms_lock.sleep procedure that is usually owned by SYS. The id that owns my code has to have the privilege to execute this package granted to it.

Apart from the obvious 'grab a row', the package as a whole contains a total of 10 procedures:

create_driver

Create (logically, but not actually) a driving table for a new class of tasks

drop_driver

Drop (logically) the driving table for a class of tasks

populate_driver

Insert one task definition (a.k.a. payload) into the driving table

allocate_target_item

Find a 'New' task and change it to 'Active', returning the payload to the caller

complete_target_item

Change an 'Active' task to 'Complete'

error_target_item

Change an 'Active task to 'Errored'

free_target_item

Set an 'Active' task back to 'New' - to cater for doing one bit again

reset_all_targets

Change all 'Complete' tasks back to 'New' - to cater for a total batch restart

clear_error_for_target_item

Change an 'Errored' task back to 'New' - after fixing a problem

clear_all_errors

Change all 'Errored' tasks back to 'New' - after fixing a big problem.

Between them, these 10 procedures should allow you to create simple, but robust, batch processes to handle arbitrarily large groups of similar tasks with an arbitrary degree of concurrency. (My first use was for a batch run to turn 104 tables of weekly detail into 104 tables of summaries each Saturday).

There are three pieces of code to collect:

        create the driving table
复制
        create the package header
复制
        create the package body
复制

最后修改时间:2020-04-16 14:52:29
【版权声明】本文为墨天轮用户原创内容,转载时必须标注文章的来源(墨天轮),文章链接,文章作者等基本信息,否则作者和墨天轮有权追究责任。如果您发现墨天轮中有涉嫌抄袭或者侵权的内容,欢迎发送邮件至:contact@modb.pro进行举报,并提供相关证据,一经查实,墨天轮将立刻删除相关内容。

评论