Advanced Queuing In Oracle9i
This article is a basic primer to get you started using Advanced Queuing in Oracle9i and Oracle8i:
- Environment Setup
- Define Payload
- Create Queue Table And Queue
- Grant Privilege On Queue
- Enqueue Message
- Dequeue Message
- Variations
Top of Form
Environment Setup
Administration and access privileges for advanced queuing are controled using two roles:
- AQ_ADMINISTRATOR_ROLE – Allows creation and administration of queuing infrastructure.
- AQ_USER_ROLE – Allows access to queues for enqueue and dequeue operations.
In the following examples I have used two schemas, one owning the queuing infrastructure and another with access to it:
CONNECT system/password@tsh9i
CREATE USER aq_admin IDENTIFIED BY aq_admin DEFAULT TABLESPACE users;
GRANT connect TO aq_admin;
GRANT create type TO aq_admin;
GRANT aq_administrator_role TO aq_admin;
ALTER USER aq_admin QUOTA UNLIMITED ON users;
CREATE USER aq_user IDENTIFIED BY aq_user DEFAULT TABLESPACE users;
GRANT connect TO aq_admin;
GRANT aq_user_role TO aq_user;
Define Payload
The content, or payload, of a message is defined using an object type which must be defined before a queue is created:
CONNECT aq_admin/aq_admin@tsh9i
CREATE OR REPLACE TYPE event_msg_type AS OBJECT (
name VARCHAR2(10),
current_status NUMBER(5),
next_status NUMBER(5)
);
/
GRANT EXECUTE ON event_msg_type TO aq_user;
Create Queue Table And Queue
Once the payload type is created the queuing infrastructure can be created. Queues are implemented using a queue table which can hold multiple queues with the same payload type. First the queue table must be defined using the payload type, then the queue can be defined and started. These operations are all performed using the DBMS_AQADM package:
CONNECT aq_admin/aq_admin@tsh9i
EXECUTE DBMS_AQADM.create_queue_table ( –
queue_table => ‘aq_admin.event_queue_tab’, –
queue_payload_type => ‘aq_admin.event_msg_type’);
EXECUTE DBMS_AQADM.create_queue ( –
queue_name => ‘aq_admin.event_queue’, –
queue_table => ‘aq_admin.event_queue_tab’);
EXECUTE DBMS_AQADM.start_queue ( –
queue_name => ‘aq_admin.event_queue’, –
enqueue => TRUE);
Grant Privilege On Queue
The DBMS_AQADM package is also used to grant privileges on queues so that other users can access them:
CONNECT aq_admin/aq_admin@tsh9i
EXECUTE DBMS_AQADM.grant_queue_privilege ( –
privilege => ‘ALL’, –
queue_name => ‘aq_admin.event_queue’, –
grantee => ‘aq_user’, –
grant_option => FALSE);
At this point the queue can be used for enqueue and dequeue operations by the AQ_USER user.
Enqueue Message
Messages can be written to the queue using the DBMS_AQ.ENQUEUE procedure:
CONNECT aq_user/aq_user@tsh9i
DECLARE
l_enqueue_options DBMS_AQ.enqueue_options_t;
l_message_properties DBMS_AQ.message_properties_t;
l_message_handle RAW(16);
l_event_msg AQ_ADMIN.event_msg_type;
BEGIN
l_event_msg := AQ_ADMIN.event_msg_type(‘REPORTER’, 1, 2);
DBMS_AQ.enqueue(queue_name => ‘aq_admin.event_queue’,
enqueue_options => l_enqueue_options,
message_properties => l_message_properties,
payload => l_event_msg,
msgid => l_message_handle);
COMMIT;
END;
/
Dequeue Message
Messages can be read from the queue using the DBMS_AQ.DEQUEUE procedure:
CONNECT aq_user/aq_user@tsh9i
SET SERVEROUTPUT ON
DECLARE
l_dequeue_options DBMS_AQ.dequeue_options_t;
l_message_properties DBMS_AQ.message_properties_t;
l_message_handle RAW(16);
l_event_msg AQ_ADMIN.event_msg_type;
BEGIN
DBMS_AQ.dequeue(queue_name => ‘aq_admin.event_queue’,
dequeue_options => l_dequeue_options,
message_properties => l_message_properties,
payload => l_event_msg,
msgid => l_message_handle);
DBMS_OUTPUT.put_line (‘Event Name : ‘ || l_event_msg.name);
DBMS_OUTPUT.put_line (‘Event Current Status: ‘ || l_event_msg.current_status);
DBMS_OUTPUT.put_line (‘Event Next Status : ‘ || l_event_msg.next_status);
COMMIT;
END;
/
Variations
The DEQUEUE_OPTIONS_T, DEQUEUE_OPTIONS_T and MESSAGE_PROPERTIES_T types can be used to vary the way messages are enqueued and dequeued. This is where the real flexibility of advanced queuing becomes evident. The discussion of these options is beyond the scope of this article but they are discussed in the following links:
Enqueue Options Type
Dequeue Options Type
Message Properties Type
For further information see:
Application Developer’s Guide – Advanced Queuing
DBMS_AQADM Package
DBMS_AQ Package
http://www.peacetech.com/flipper/oracle9i/901_doc/appdev.901/a88890/toc.htm
OBJECTS CREATED WHEN CREATING A QUEUE TABLE [ID 224027.1] | |||
Modified 11-SEP-2008 Type BULLETIN Status PUBLISHED |
In this Document
Purpose
Scope and Application
OBJECTS CREATED WHEN CREATING A QUEUE TABLE
Objects always created
Objects created only for multiple consumers queue tables
Objects specific prior to release 9iR2
References
Applies to:
Oracle Server – Enterprise Edition – Version: 8.1.7.0 to 11.1.0.6
Oracle Server – Personal Edition – Version: 8.1.7.0 to 11.1.0.6
Information in this document applies to any platform.
Purpose
The purpose of this document is to give the reader an understanding of the objects created at database level and when running the API DBMS_AQ_ADM.CREATE_QUEUE_TABLE.
Note that API DBMS_AQ_ADM.CREATE_QUEUE_TABLE have different options and therefore some objects are created or not depending on the options used.
It does not describe structures that are supported in memory. To manage the movement of messages.
Scope and Application
The information show here is not applicable for releases 8.0 nor for those queue tables created with compatible parameter set to ‘8.0’.
Due the objects created vary across releases, some specific comments will appear across the document.
OBJECTS CREATED WHEN CREATING A QUEUE TABLE
Objects always created
- <QUEUE_TABLE_NAME>, TABLE. Is the queue table itself and will contain 1 row per message in the queue table.
- AQ$_<QUEUE_TABLE_NAME>_E, QUEUE. It is default exception queue for any queue defined on the queue table.
- AQ$<QUEUE_TABLE_NAME>, VIEW. The definition of the view depends if the queue table is a single consumer or a multiple consumer queue table. It will contain one row per message and subscriber/recipient.
- AQ$_<QUEUE_TABLE_NAME>_I, INDEX when the queue table is a single consumer queue table and as Index-Organized Table (IOT) when it is a multi-consumer table. It is used to control dequeuing operations, in the case of multiconsumer queue tables will contain 1 row per message not consumer per subscriber/recipient.
- AQ$_<QUEUE_TABLE_NAME>_T, INDEX when the queue table is a single consumer queue table and as an IOT when it is a multi-consumer table. It is used for Time based operations and managed by QMON process and will contain 1 row per message requiring work.
- AQ$_<QUEUE_TABLE_NAME>_F, VIEW . Created to manage conditional dequeues. This view is only created on releases 10G and upwards.
- System name LOB segment is generated associated to column USER_PROP of the queue table. This exists since releases 10G
- System name LOB segment is generated if the payload for the queue table has been defined as RAW or SYS.ANYDATA and it is associated to USER_DATA column of the queue table.
IOTs objects appears on dba_objects view as type TABLE and it has an associated an INDEX object with the name SYS_IOT_TOP_<NUMBER> where <NUMBER> is the object_id of the table. Additionally an object called SYS_IOT_OVER_<NUMBER> could be created if the IOT has been defined with overflow.
Objects created only for single consumer queue tables
- System generated name INDEX defined on column MSGID of the queue table.
Objects created only for multiple consumers queue tables
- AQ$_<QUEUE_TABLE_NAME>_S, TABLE. Keep information about the subscribers.
- AQ$_<QUEUE_TABLE_NAME>_H, IOT. Keep historical data and contains one row per message per consumer.
- AQ$_<QUEUE_TABLE_NAME>_N, SEQUENCE, used to assign subscriber ids.
- AQ$<QUEUE_TABLE_NAME>_S, VIEW. Based on AQ$_<QUEUE_TABLE_NAME>_S, it references subscribers and transformations.
- AQ$_<QUEUE_TABLE_NAME>_V, EVALUATION CONTEXT, required to evaluate rules assigned to subscribed properly. Only available since 9.2.
- AQ$_<QUEUE_TABLE_NAME>_G, IOT. Called AQ$_<queue_table_name>_NR on releases 9i and not existing on releases 8.1. It is the signature IOT and do not have implementation currently.
- AQ$_<QUEUE_TABLE_NAME>_C, IOT. Used with transactionally grouped Multi-consumer Queues with a commit time order
Starting on release 9.2.0, if the queue table type is ANYDATA, a buffered queue is created in memory. The messages enqueued in the buffered queue are staged in a memory buffer associated with the queue; they are not ordinarily written to disk.
If they have been staged in the buffer for a period of time without being dequeued, or if there is not enough space in memory to hold all the messages, then they may be
spilled to disk to and the following objects are created:
- AQ$_<QUEUE_TABLE_NAME>_P, TABLE. To store the messages that spill from memory.
- AQ$_<QUEUE_TABLE_NAME>_D, TABLE. To store information about the consumers that are eligible for processing each event.
Objects specific prior to release 9iR2
- AQ$_<QUEUE_TABLE_NAME>_R, TABLE. Contains information regarding the rules metada of the subscriber. It was subsumed into AQ$_<QUEUE_TABLE_NAME>_S in release 9iR2.
- AQ<QUEUE_TABLE_NAME>_R, VIEW. That joins the information of AQ$_<QUEUE_TABLE_NAME>_R and AQ$_<QUEUE_TABLE_NAME>_S tables.
- System generated name LOB SEGMENT to store HISTORY column of the queue table.
- System generated name INDEX on queue table column MSGID for multiple consumer queue
RELATED DOCUMENTS
—————–
– Oracle9i – Supplied PL/SQL Packages and Types Reference
– Oracle9i – Application Developer’s Guide – Advanced Queuing
– Oracle Streams Advanced Queuing User’s Guide and Reference 10g Release 2 (10.2)
References
NOTE:203225.1 – How to Manually Cleanup Advanced Queuing Tables
NOTE:267933.1 – Advanced Queuing Knowledge Browser Product Page
NOTE:230901.1 – What are Streams Queue Buffers?
Related
Products
Keywords
|
asked Sean Dillon, a technologist on my team who knows AQ, to take a look at this and here’s what
he had to say:
———
So there are few things we can address. First of all, queue propagation doesn’t happen
immediately. There is a bit of delay when you enqueue the first message into a queue that has an
enabled queue schedule. Once the first message goes, it’s very fast because the job queue process
is waiting for the next message. So the script above, even if everything was correct, would not
see the message get propagated because you immediately stop and drop the queues before they have an
opportunity to propagate.
When deciding how you will propagate a message from queue A to queue B, you have a couple of
choices. You can either specify a subscriber on the queue, which means all message queued to the
message should be send, you can specify a rule-based subscriber on the queue, which means messages
that conform to the rule will be sent, or you can specify a recipient list in the message itself.
In your example above, you are specifying a recipient list. This means the only way Oracle knows
who the message is intended for is via the recipient list. You can schedule propagation (as you
have done), but this doesn’t mean all the messages will be propagated (as you have seen :-)).
What I have done is added a parameter to your enqueue_msg procedure for the agent address. When
you are instantiating an aq$_agent for the recipient_list, I pass this parameter in the ADDRESS
field of the aq$_agent object. This lets you enqueue to a null address or to a remote queue.
Then, when I enqueue the ‘This is a test….’ message, it will never be propagated because it’s
agent address is NULL. When I enqueue the ‘This should be propagated.’ message, it WILL BE
propagated because I pass ‘aqtest.MSG_QUEUEX@aqtest.loopback’ as the address.
Here’s the complete example, with edits:
———————-
— SMD: i moved this up so my queues would exists AFTER the example had run.
— SMD: it will fail the first time you run this script.
connect aqtest/aqtest
begin
DBMS_AQADM.stop_queue(‘MSG_QUEUE’);
DBMS_AQADM.drop_queue(‘MSG_QUEUE’);
DBMS_AQADM.drop_queue_table(‘aqtest.objmsgs80_qtab’);
DBMS_AQADM.stop_queue(‘MSG_QUEUEX’);
DBMS_AQADM.drop_queue(‘MSG_QUEUEX’);
DBMS_AQADM.drop_queue_table(‘aqtest.objmsgs80_qtabX’);
end;
/
connect sys/o9isb9 as sysdba
drop user aqtest cascade;
CREATE USER aqtest IDENTIFIED BY aqtest;
GRANT CONNECT, RESOURCE, aq_administrator_role TO aqtest;
GRANT EXECUTE ON dbms_aq TO aqtest;
GRANT EXECUTE ON dbms_aqadm TO aqtest;
begin
dbms_aqadm.grant_system_privilege(‘ENQUEUE_ANY’,’AQTEST’,FALSE);
dbms_aqadm.grant_system_privilege(‘DEQUEUE_ANY’,’AQTEST’,FALSE);
end;
/
connect aqtest/aqtest
CREATE type aqtest.Message_typ as object(subject VARCHAR2(30), text VARCHAR2(80));
/
begin
DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => ‘aqtest.objmsgs80_qtab’,
queue_payload_type => ‘aqtest.Message_typ’,
multiple_consumers => TRUE);
DBMS_AQADM.CREATE_QUEUE(queue_name => ‘MSG_QUEUE’,
queue_table => ‘aqtest.objmsgs80_qtab’);
DBMS_AQADM.START_QUEUE(queue_name => ‘MSG_QUEUE’);
end;
/
— ADDED: Setup additional queue to propagate messages to
begin
DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => ‘aqtest.objmsgs80_qtabX’,
queue_payload_type => ‘aqtest.Message_typ’,
multiple_consumers => TRUE);
DBMS_AQADM.CREATE_QUEUE(queue_name => ‘MSG_QUEUEX’,
queue_table => ‘aqtest.objmsgs80_qtabX’);
DBMS_AQADM.START_QUEUE(queue_name => ‘MSG_QUEUEX’);
end;
/
— SMD: you’ll see there is now a second parameter, as discussed above:
create or replace procedure enqueue_msg(p_msg in varchar2,
p_add in varchar2 default null)
as
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message aqtest.message_typ;
recipients DBMS_AQ.aq$_recipient_list_t;
BEGIN
— ADDED
— SMD: here’s where the parameter is used
recipients(1) := SYS.aq$_agent(‘RECIPIENT’, p_add, null);
message_properties.recipient_list := recipients;
message := message_typ(‘NORMAL MESSAGE’, p_msg );
dbms_aq.enqueue(queue_name => ‘msg_queue’,
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
end;
/
— SMD: this msgs is meant for THIS queue and will not be propagated.
begin enqueue_msg(‘This is a test….’); commit; end;
/
— ADDED: Create loopback database link
create database link aqtest.LoopBack connect to aqtest identified by aqtest using
‘funk92.us.oracle.com’;
— ADDED: Setup scheduling for messages
begin DBMS_AQADM.Schedule_Propagation(Queue_Name => ‘MSG_QUEUE’,
Destination => ‘aqtest.LoopBack’,
Start_Time => sysdate,
Latency => 0);
end;
/
— Check scheduling: Everything checked out OK.
select * from user_queue_schedules;
— SMD: this msgs is meant for the MSG_QUEUEX queue and WILL BE propagated.
begin enqueue_msg(‘This should be propagated.’, ‘aqtest.MSG_QUEUEX@aqtest.LoopBack’); commit; end;
/
— Check scheduling: Neither error nor action reported
select * from user_queue_schedules;
———————————-
I think this is what you’re looking for:
aqtest@FUNK92> select t1.cnt, t2.cnt
2 from (select count(*) cnt from objmsgs80_qtab) t1,
3 (select count(*) cnt from objmsgs80_qtabx) t2
4 /
CNT CNT
———- ———-
1 1
Hope that helps!
Bottom of Form
advanced queues