SQL> CREATE TYPE message_type AS OBJECT ( message_body VARCHAR2(50) ); 2 / Type created. |
SQL> BEGIN 2 dbms_aqadm.create_queue_table(queue_table => 'queue_table', 3 queue_payload_type => 'message_type'); 4 END; 5 / PL/SQL procedure successfully completed. |
SQL> DESC queue_table Name Null? Type ----------------------------------------- -------- --------------------------- Q_NAME VARCHAR2(30) MSGID NOT NULL RAW(16) CORRID VARCHAR2(128) PRIORITY NUMBER STATE NUMBER DELAY DATE EXPIRATION NUMBER TIME_MANAGER_INFO DATE LOCAL_ORDER_NO NUMBER CHAIN_NO NUMBER CSCN NUMBER DSCN NUMBER ENQ_TIME DATE ENQ_UID NUMBER ENQ_TID VARCHAR2(30) DEQ_TIME DATE DEQ_UID NUMBER DEQ_TID VARCHAR2(30) RETRY_COUNT NUMBER EXCEPTION_QSCHEMA VARCHAR2(30) EXCEPTION_QUEUE VARCHAR2(30) STEP_NO NUMBER RECIPIENT_KEY NUMBER DEQUEUE_MSGID RAW(16) SENDER_NAME VARCHAR2(30) SENDER_ADDRESS VARCHAR2(1024) SENDER_PROTOCOL NUMBER USER_DATA MESSAGE_TYPE |
SQL> BEGIN 2 dbms_aqadm.create_queue( queue_name => 'example_queue', 3 queue_table => 'queue_table' ); 4 END; 5 / PL/SQL procedure successfully completed. |
SQL> BEGIN 2 dbms_aqadm.start_queue(queue_name=>'example_queue'); 3 END; 4 / PL/SQL procedure successfully completed. |
SQL> CREATE OR REPLACE PROCEDURE p_enqueue(msg IN VARCHAR2) 2 AS 3 PRAGMA AUTONOMOUS_TRANSACTION; 4 enqueue_options dbms_aq.enqueue_options_t; 5 message_properties dbms_aq.message_properties_t; 6 message_handle RAW(16); 7 BEGIN 8 dbms_aq.enqueue( queue_name => 'example_queue', 9 enqueue_options => enqueue_options, 10 message_properties => message_properties, 11 payload => message_type(msg), 12 msgid => message_handle); 13 COMMIT; 14 END; 15 / Procedure created. |
SQL> CREATE OR REPLACE PROCEDURE p_dequeue 2 AS 3 PRAGMA AUTONOMOUS_TRANSACTION; 4 dequeue_options dbms_aq.dequeue_options_t; 5 message_properties dbms_aq.message_properties_t; 6 message_handle RAW(16); 7 message message_type; 8 BEGIN 9 dbms_aq.dequeue( queue_name => 'example_queue', 10 dequeue_options => dequeue_options, 11 message_properties => message_properties, 12 payload => message, 13 msgid => message_handle); 14 dbms_output.put_line('Message : ' || message.message_body); 15 COMMIT; 16 END p_dequeue; 17 / Procedure created. |
SQL> set serverout on SQL> EXEC p_enqueue('THIS IS A TEST MESSAGE'); PL/SQL procedure successfully completed. SQL> EXEC p_dequeue Message : THIS IS A TEST MESSAGE PL/SQL procedure successfully completed. |
SQL> CREATE OR REPLACE PROCEDURE p_dequeue 2 AS 3 PRAGMA AUTONOMOUS_TRANSACTION; 4 dequeue_options dbms_aq.dequeue_options_t; 5 message_properties dbms_aq.message_properties_t; 6 message_handle RAW(16); 7 message message_type; 8 BEGIN 9 dequeue_options.dequeue_mode := DBMS_AQ.REMOVE; 10 dbms_aq.dequeue( queue_name => 'example_queue', 11 dequeue_options => dequeue_options, 12 message_properties => message_properties, 13 payload => message, 14 msgid => message_handle); 15 dbms_output.put_line('Message : ' || message.message_body); 16 COMMIT; 17 END p_dequeue; |
Dequeue Mode | Meaning |
DBMS_AQ.LOCKED | Message is read and locked by the dequeue operation. No other process can see the message until a COMMIT or ROLLBACK occurs. |
DBMS_AQ.BROWSE | Message is read, but other processes can still see the message. There is no guarantee that a message will be BROWSEd and then available afterwards due to the fact that another session may have removed it |
DBMS_AQ.REMOVE | Message is read and removed immediately (but can be retained depending upon the retention properties of the queue). This is the default. |
DBMS_AQ.REMOVE_NODATA | Message is "marked" as read and updated or deleted. This is useful if you've already BROWSEd or LOCKED the message, and now want to remove it, but without the overhead of receiving the full message |
Wait Mode | Meaning |
DBMS_AQ.FOREVER | Dequeue operation waits until a message of the correct criteria is enqueued. This is the default. |
DBMS_AQ.NO_WAIT | Does not wait at all. Note, the exception -25228 will be raised if no messages exist. You will probably want to handle this in the dequeue procedure. |
Number of Seconds | Waits a specified number of seconds before raising the -25228 exception if no messages exist. |
LOOP Attempt dequeue with wait FOREVER which blocks until message appears; process_message; END LOOP;The advantages are :
SQL> BEGIN 2 dbms_aqadm.create_queue_table(queue_table => 'queue_table', 3 queue_payload_type => 'message_type', 4 multiple_consumers=>TRUE); 5 END; 6 / |
SQL> BEGIN 2 dbms_aqadm.create_queue_table(queue_table => 'queue_table', 3 queue_payload_type => 'message_type', 4 multiple_consumers=>TRUE); 5 END; 6 / BEGIN * ERROR at line 1: ORA-24166: evaluation context ORAUSER.AQ$_QUEUE_TABLE_V has errors ORA-01925: maximum of 30 enabled roles exceeded ORA-06512: at "SYS.DBMS_AQADM_SYS", line 2224 ORA-06512: at "SYS.DBMS_AQADM", line 58 ORA-06512: at line 2In this case, you need to up the MAX_ENABLED_ROLES initialisation parameter, or reduce the number of roles that are assigned to the user.
CREATE OR REPLACE PROCEDURE p_dequeue ( context raw, reginfo sys.aq$_reg_info, descr sys.aq$_descriptor, payload raw, payloadl number) as dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message message_type; BEGIN dequeue_options.msgid := descr.msg_id; dequeue_options.consumer_name := descr.consumer_name; dbms_aq.dequeue(queue_name => descr.queue_name, dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); /* Add the INSERT into an output table. This is so we can see it working. In reality, the logic to execute the process would be here */ INSERT INTO output_table VALUES ( message.message_body ); COMMIT; END; |
SQL> BEGIN 2 dbms_aqadm.add_subscriber 3 ( queue_name => 'EXAMPLE_QUEUE', 4 subscriber => sys.aq$_agent('RECIPIENT', NULL, NULL) ); 5 END; 6 / PL/SQL procedure successfully completed. |
SQL> BEGIN 2 dbms_aq.register 3 ( sys.aq$_reg_info_list ( 4 sys.aq$_reg_info ( 'EXAMPLE_QUEUE:RECIPIENT', 5 dbms_aq.namespace_aq, 6 'plsql://p_dequeue', 7 HEXTORAW('FF') 8 ) 9 ), 10 1 11 ); 12* END; SQL> / PL/SQL procedure successfully completed. |
SQL> EXEC p_enqueue('THIS IS A TEST'); PL/SQL procedure successfully completed. SQL> SELECT * FROM output_table; MSG ---------------------------------------------------------------------------------------------------- THIS IS A TEST |
DECLARE
reginfolist sys.aq$_reg_info_list = sys.aq$_reg_info_list();
BEGIN
reginfolist.EXTEND(3);
reginfolist(1) := sys.aq$_reg_info ( 'EXAMPLE_QUEUE:RECIPIENT', dbms_aq.namespace_aq,'plsql://p_dequeue', HEXTORAW('FF') );
reginfolist(2) := sys.aq$_reg_info ( 'EXAMPLE_QUEUE:RECIPIENT', dbms_aq.namespace_aq,'mailto://[email protected]', HEXTORAW('FF') );
reginfolist(3) := sys.aq$_reg_info ( 'EXAMPLE_QUEUE:RECIPIENT', dbms_aq.namespace_aq,'http://webserver/servlet', HEXTORAW('FF') );
-- do the registration
sys.dbms_aq.register(reginfolist, 3);
END;
|
SQL> begin 2 dbms_aqadm.stop_queue('EXAMPLE_QUEUE'); 3 dbms_aqadm.drop_queue('EXAMPLE_QUEUE'); 4 dbms_aqadm.drop_queue_table('queue_table'); 5 end; 6 / PL/SQL procedure successfully completed. |
SQL> SELECT 2 a.owner, 3 a.name, 4 a.queue_type, 5 a.queue_table, 6 a.retention, 7 a.enqueue_enabled, 8 a.dequeue_enabled, 9 b.waiting, 10 b.ready, 11 b.expired, 12 b.total_wait, 13 b.average_wait 14 FROM 15 dba_queues a, 16 v$aq b 17 WHERE a.qid = b.qid 18 AND a.name = 'EXAMPLE_QUEUE' 19 / OWNER NAME QUEUE_TYPE QUEUE_TABLE ------------------------------ ------------------------------ -------------------- ----------------- RETENTION ENQUEUE DEQUEUE WAITING READY EXPIRED TOTAL_WAIT AVERAGE_WAIT ---------------------------------------- ------- ------- ---------- ---------- ---------- ---------- ORAUSER EXAMPLE_QUEUE NORMAL_QUEUE QUEUE_TABLE 0 YES YES 0 0 0 0 0 |
State | Value | Explanation |
0 | READY | The message is ready to be processed, i.e. either the message delay time has passed, or there was none specified |
1 | WAITING | The delay specified by message_properties_t.delay has not been reached |
2 | RETAINED or PROCESSED | The message has been successfully dequeued, and will remain in the queue for as long as was specified as the retention time when creating the queue |
3 | EXPIRED | The message has not been successfully dequeued, because either
|
SQL> SELECT userenv('sessionid'), sys_context('USERENV', 'SESSIONID') 2 FROM dual; USERENV('SESSIONID') SYS_CONTEXT('USERENV','SESSIONID') -------------------- --------------------------------------------------- 3006 3006 |
SQL> SELECT sid, serial#, audsid, username, program 2 FROM v$session 3 / SID SERIAL# AUDSID USERNAME PROGRAM -------- ---------- ---------- ---------- ------------------- 1 1 0 ORACLE.EXE 2 1 0 ORACLE.EXE 3 1 0 ORACLE.EXE 4 1 0 ORACLE.EXE 5 1 0 ORACLE.EXE 6 1 0 ORACLE.EXE 7 1 0 ORACLE.EXE 8 1 0 ORACLE.EXE 11 54 0 ORACLE.EXE 13 875 3006 ORAUSER sqlplusw.exe 10 rows selected. |
CREATE OR REPLACE PROCEDURE plsqlCallback ( context raw, reginfo sys.aq$_reg_info, descr sys.aq$_descriptor, payload raw, payloadl number) as dequeue_options dbms_aq.dequeue_options_t; message_properties dbms_aq.message_properties_t; message_handle RAW(16); message message_type; BEGIN dequeue_options.msgid := descr.msg_id; dequeue_options.consumer_name := descr.consumer_name; DBMS_AQ.DEQUEUE(queue_name => descr.queue_name, dequeue_options => dequeue_options, message_properties => message_properties, payload => message, msgid => message_handle); INSERT INTO output_table ( msg ) SELECT user || ' : ' || SYS_CONTEXT('userenv', 'sessionid') || ' : ' || sid FROM v$mystat WHERE rownum = 1; COMMIT; END; / SQL> SELECT * 2 FROM output_table; MSG -------------------------------- SYS : 0 : 18 |