advanced queues concepts – new oracle dba career





Advanced Queuing In Oracle9i

This article is a basic primer to get you started using Advanced Queuing in Oracle9i and Oracle8i:

Top of Form

Environment Setup

Administration and access privileges for advanced queuing are controled using two roles:

  • AQ_ADMINISTRATOR_ROLE – Allows creation and administration of queuing infrastructure.
  • AQ_USER_ROLE – Allows access to queues for enqueue and dequeue operations.

In the following examples I have used two schemas, one owning the queuing infrastructure and another with access to it:

CONNECT system/password@tsh9i

CREATE USER aq_admin IDENTIFIED BY aq_admin DEFAULT TABLESPACE users;

GRANT connect TO aq_admin;

GRANT create type TO aq_admin;

GRANT aq_administrator_role TO aq_admin;

ALTER USER aq_admin QUOTA UNLIMITED ON users;

CREATE USER aq_user IDENTIFIED BY aq_user DEFAULT TABLESPACE users;

GRANT connect TO aq_admin;

GRANT aq_user_role TO aq_user;

Define Payload

The content, or payload, of a message is defined using an object type which must be defined before a queue is created:

CONNECT aq_admin/aq_admin@tsh9i

CREATE OR REPLACE TYPE event_msg_type AS OBJECT (

name VARCHAR2(10),

current_status NUMBER(5),

next_status NUMBER(5)

);

/

GRANT EXECUTE ON event_msg_type TO aq_user;

Create Queue Table And Queue

Once the payload type is created the queuing infrastructure can be created. Queues are implemented using a queue table which can hold multiple queues with the same payload type. First the queue table must be defined using the payload type, then the queue can be defined and started. These operations are all performed using the DBMS_AQADM package:

CONNECT aq_admin/aq_admin@tsh9i

EXECUTE DBMS_AQADM.create_queue_table ( –

queue_table => ‘aq_admin.event_queue_tab’, –

queue_payload_type => ‘aq_admin.event_msg_type’);

EXECUTE DBMS_AQADM.create_queue ( –

queue_name => ‘aq_admin.event_queue’, –

queue_table => ‘aq_admin.event_queue_tab’);

EXECUTE DBMS_AQADM.start_queue ( –

queue_name => ‘aq_admin.event_queue’, –

enqueue => TRUE);

Grant Privilege On Queue

The DBMS_AQADM package is also used to grant privileges on queues so that other users can access them:

CONNECT aq_admin/aq_admin@tsh9i

EXECUTE DBMS_AQADM.grant_queue_privilege ( –

privilege => ‘ALL’, –

queue_name => ‘aq_admin.event_queue’, –

grantee => ‘aq_user’, –

grant_option => FALSE);

At this point the queue can be used for enqueue and dequeue operations by the AQ_USER user.

Enqueue Message

Messages can be written to the queue using the DBMS_AQ.ENQUEUE procedure:

CONNECT aq_user/aq_user@tsh9i

DECLARE

l_enqueue_options DBMS_AQ.enqueue_options_t;

l_message_properties DBMS_AQ.message_properties_t;

l_message_handle RAW(16);

l_event_msg AQ_ADMIN.event_msg_type;

BEGIN

l_event_msg := AQ_ADMIN.event_msg_type(‘REPORTER’, 1, 2);

DBMS_AQ.enqueue(queue_name => ‘aq_admin.event_queue’,

enqueue_options => l_enqueue_options,

message_properties => l_message_properties,

payload => l_event_msg,

msgid => l_message_handle);

COMMIT;

END;

/

Dequeue Message

Messages can be read from the queue using the DBMS_AQ.DEQUEUE procedure:

CONNECT aq_user/aq_user@tsh9i

SET SERVEROUTPUT ON

DECLARE

l_dequeue_options DBMS_AQ.dequeue_options_t;

l_message_properties DBMS_AQ.message_properties_t;

l_message_handle RAW(16);

l_event_msg AQ_ADMIN.event_msg_type;

BEGIN

DBMS_AQ.dequeue(queue_name => ‘aq_admin.event_queue’,

dequeue_options => l_dequeue_options,

message_properties => l_message_properties,

payload => l_event_msg,

msgid => l_message_handle);

DBMS_OUTPUT.put_line (‘Event Name : ‘ || l_event_msg.name);

DBMS_OUTPUT.put_line (‘Event Current Status: ‘ || l_event_msg.current_status);

DBMS_OUTPUT.put_line (‘Event Next Status : ‘ || l_event_msg.next_status);

COMMIT;

END;

/

Variations

The DEQUEUE_OPTIONS_T, DEQUEUE_OPTIONS_T and MESSAGE_PROPERTIES_T types can be used to vary the way messages are enqueued and dequeued. This is where the real flexibility of advanced queuing becomes evident. The discussion of these options is beyond the scope of this article but they are discussed in the following links:

Enqueue Options Type
Dequeue Options Type
Message Properties Type

For further information see:

Application Developer’s Guide – Advanced Queuing
DBMS_AQADM Package
DBMS_AQ Package

http://www.peacetech.com/flipper/oracle9i/901_doc/appdev.901/a88890/toc.htm

OBJECTS CREATED WHEN CREATING A QUEUE TABLE [ID 224027.1]  
 
  Modified 11-SEP-2008     Type BULLETIN     Status PUBLISHED  

In this Document
Purpose
Scope and Application
OBJECTS CREATED WHEN CREATING A QUEUE TABLE
Objects always created
Objects created only for multiple consumers queue tables
Objects specific prior to release 9iR2
References

Applies to:

Oracle Server – Enterprise Edition – Version: 8.1.7.0 to 11.1.0.6
Oracle Server – Personal Edition – Version: 8.1.7.0 to 11.1.0.6
Information in this document applies to any platform.

Purpose

The purpose of this document is to give the reader an understanding of the objects created at database level and when running the API DBMS_AQ_ADM.CREATE_QUEUE_TABLE.

Note that API DBMS_AQ_ADM.CREATE_QUEUE_TABLE have different options and therefore some objects are created or not depending on the options used.

It does not describe structures that are supported in memory. To manage the movement of messages.

Scope and Application

The information show here is not applicable for releases 8.0 nor for those queue tables created with compatible parameter set to ‘8.0’.

Due the objects created vary across releases, some specific comments will appear across the document.

OBJECTS CREATED WHEN CREATING A QUEUE TABLE

Objects always created

  • <QUEUE_TABLE_NAME>, TABLE. Is the queue table itself and will contain 1 row per message in the queue table.
  • AQ$_<QUEUE_TABLE_NAME>_E, QUEUE. It is default exception queue for any queue defined on the queue table.
  • AQ$<QUEUE_TABLE_NAME>, VIEW. The definition of the view depends if the queue table is a single consumer or a multiple consumer queue table. It will contain one row per message and subscriber/recipient.
  • AQ$_<QUEUE_TABLE_NAME>_I,  INDEX when the queue table is a single consumer queue table and as Index-Organized Table (IOT) when it is a multi-consumer table. It is used to control dequeuing operations, in the case of multiconsumer queue tables will contain 1 row per message not consumer per subscriber/recipient.
  • AQ$_<QUEUE_TABLE_NAME>_T, INDEX when the queue table is a single consumer queue table and as an IOT when it is a multi-consumer table. It is used for Time based operations and managed by QMON process and will contain 1 row per message requiring work.
  • AQ$_<QUEUE_TABLE_NAME>_F, VIEW . Created to manage conditional dequeues. This view is only created on releases 10G and upwards.
  • System name LOB segment is generated associated to column USER_PROP of the queue table. This exists since releases 10G
  • System name LOB segment is generated if the payload for the queue table has been defined as RAW or SYS.ANYDATA and it is associated to USER_DATA column of the queue table.

IOTs objects appears on dba_objects view as type TABLE and it has an associated an INDEX object with the name SYS_IOT_TOP_<NUMBER> where <NUMBER> is the object_id of the table. Additionally an object called SYS_IOT_OVER_<NUMBER> could be created if the IOT has been defined with overflow.
Objects created only for single consumer queue tables

  • System generated name INDEX defined on column MSGID of the queue table.

Objects created only for multiple consumers queue tables

  • AQ$_<QUEUE_TABLE_NAME>_S, TABLE. Keep information about the subscribers.
  • AQ$_<QUEUE_TABLE_NAME>_H, IOT. Keep historical data and contains one row per message per consumer.
  • AQ$_<QUEUE_TABLE_NAME>_N, SEQUENCE, used to assign subscriber ids.
  • AQ$<QUEUE_TABLE_NAME>_S, VIEW. Based on AQ$_<QUEUE_TABLE_NAME>_S, it references subscribers and transformations.
  • AQ$_<QUEUE_TABLE_NAME>_V, EVALUATION CONTEXT, required to evaluate rules assigned to subscribed properly. Only available since 9.2.
  • AQ$_<QUEUE_TABLE_NAME>_G, IOT. Called AQ$_<queue_table_name>_NR on releases 9i and not existing on releases 8.1. It is the signature IOT and do not have implementation currently.
  • AQ$_<QUEUE_TABLE_NAME>_C, IOT. Used with transactionally grouped Multi-consumer Queues with a commit time order

Starting on release 9.2.0, if the queue table type is ANYDATA, a buffered queue is created in memory. The messages enqueued in the buffered queue are staged in a memory buffer associated with the queue; they are not ordinarily written to disk.

If they have been staged in the buffer for a period of time without being dequeued, or if there is not enough space in memory to hold all the messages, then they may be
spilled to disk to and the following objects are created:

  • AQ$_<QUEUE_TABLE_NAME>_P, TABLE. To store the messages that spill from memory.
  • AQ$_<QUEUE_TABLE_NAME>_D, TABLE. To store information about the consumers that are eligible for processing each event.

Objects specific prior to release 9iR2

  • AQ$_<QUEUE_TABLE_NAME>_R, TABLE. Contains information regarding the rules metada of the subscriber. It was subsumed into AQ$_<QUEUE_TABLE_NAME>_S in release 9iR2.
  • AQ<QUEUE_TABLE_NAME>_R, VIEW. That joins the information of AQ$_<QUEUE_TABLE_NAME>_R and AQ$_<QUEUE_TABLE_NAME>_S tables.
  • System generated name LOB SEGMENT to store HISTORY column of the queue table.
  • System generated name INDEX on queue table column MSGID for multiple consumer queue

RELATED DOCUMENTS
—————–

– Oracle9i – Supplied PL/SQL Packages and Types Reference
– Oracle9i – Application Developer’s Guide – Advanced Queuing
– Oracle Streams Advanced Queuing User’s Guide and Reference 10g Release 2 (10.2)

References

NOTE:203225.1 – How to Manually Cleanup Advanced Queuing Tables
NOTE:267933.1 – Advanced Queuing Knowledge Browser Product Page
NOTE:230901.1 – What are Streams Queue Buffers?

Related

Products

  • Oracle Database Products > Oracle Database > Oracle Database > Oracle Server – Enterprise Edition
  • Oracle Database Products > Oracle Database > Oracle Database > Oracle Server – Personal Edition

Keywords

DBMS_AQADM.CREATE_QUEUE_TABLE

asked Sean Dillon, a technologist on my team who knows AQ, to take a look at this and here’s what

he had to say:

———

So there are few things we can address. First of all, queue propagation doesn’t happen

immediately. There is a bit of delay when you enqueue the first message into a queue that has an

enabled queue schedule. Once the first message goes, it’s very fast because the job queue process

is waiting for the next message. So the script above, even if everything was correct, would not

see the message get propagated because you immediately stop and drop the queues before they have an

opportunity to propagate.

When deciding how you will propagate a message from queue A to queue B, you have a couple of

choices. You can either specify a subscriber on the queue, which means all message queued to the

message should be send, you can specify a rule-based subscriber on the queue, which means messages

that conform to the rule will be sent, or you can specify a recipient list in the message itself.

In your example above, you are specifying a recipient list. This means the only way Oracle knows

who the message is intended for is via the recipient list. You can schedule propagation (as you

have done), but this doesn’t mean all the messages will be propagated (as you have seen :-)).

What I have done is added a parameter to your enqueue_msg procedure for the agent address. When

you are instantiating an aq$_agent for the recipient_list, I pass this parameter in the ADDRESS

field of the aq$_agent object. This lets you enqueue to a null address or to a remote queue.

Then, when I enqueue the ‘This is a test….’ message, it will never be propagated because it’s

agent address is NULL. When I enqueue the ‘This should be propagated.’ message, it WILL BE

propagated because I pass ‘aqtest.MSG_QUEUEX@aqtest.loopback’ as the address.

Here’s the complete example, with edits:

———————-

— SMD: i moved this up so my queues would exists AFTER the example had run.

— SMD: it will fail the first time you run this script.

connect aqtest/aqtest

begin

DBMS_AQADM.stop_queue(‘MSG_QUEUE’);

DBMS_AQADM.drop_queue(‘MSG_QUEUE’);

DBMS_AQADM.drop_queue_table(‘aqtest.objmsgs80_qtab’);

DBMS_AQADM.stop_queue(‘MSG_QUEUEX’);

DBMS_AQADM.drop_queue(‘MSG_QUEUEX’);

DBMS_AQADM.drop_queue_table(‘aqtest.objmsgs80_qtabX’);

end;

/

connect sys/o9isb9 as sysdba

drop user aqtest cascade;

CREATE USER aqtest IDENTIFIED BY aqtest;

GRANT CONNECT, RESOURCE, aq_administrator_role TO aqtest;

GRANT EXECUTE ON dbms_aq TO aqtest;

GRANT EXECUTE ON dbms_aqadm TO aqtest;

begin

dbms_aqadm.grant_system_privilege(‘ENQUEUE_ANY’,’AQTEST’,FALSE);

dbms_aqadm.grant_system_privilege(‘DEQUEUE_ANY’,’AQTEST’,FALSE);

end;

/

connect aqtest/aqtest

CREATE type aqtest.Message_typ as object(subject VARCHAR2(30), text VARCHAR2(80));

/

begin

DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => ‘aqtest.objmsgs80_qtab’,

queue_payload_type => ‘aqtest.Message_typ’,

multiple_consumers => TRUE);

DBMS_AQADM.CREATE_QUEUE(queue_name => ‘MSG_QUEUE’,

queue_table => ‘aqtest.objmsgs80_qtab’);

DBMS_AQADM.START_QUEUE(queue_name => ‘MSG_QUEUE’);

end;

/

— ADDED: Setup additional queue to propagate messages to

begin

DBMS_AQADM.CREATE_QUEUE_TABLE(queue_table => ‘aqtest.objmsgs80_qtabX’,

queue_payload_type => ‘aqtest.Message_typ’,

multiple_consumers => TRUE);

DBMS_AQADM.CREATE_QUEUE(queue_name => ‘MSG_QUEUEX’,

queue_table => ‘aqtest.objmsgs80_qtabX’);

DBMS_AQADM.START_QUEUE(queue_name => ‘MSG_QUEUEX’);

end;

/

— SMD: you’ll see there is now a second parameter, as discussed above:

create or replace procedure enqueue_msg(p_msg in varchar2,

p_add in varchar2 default null)

as

enqueue_options dbms_aq.enqueue_options_t;

message_properties dbms_aq.message_properties_t;

message_handle RAW(16);

message aqtest.message_typ;

recipients DBMS_AQ.aq$_recipient_list_t;

BEGIN

— ADDED

— SMD: here’s where the parameter is used

recipients(1) := SYS.aq$_agent(‘RECIPIENT’, p_add, null);

message_properties.recipient_list := recipients;

message := message_typ(‘NORMAL MESSAGE’, p_msg );

dbms_aq.enqueue(queue_name => ‘msg_queue’,

enqueue_options => enqueue_options,

message_properties => message_properties,

payload => message,

msgid => message_handle);

end;

/

— SMD: this msgs is meant for THIS queue and will not be propagated.

begin enqueue_msg(‘This is a test….’); commit; end;

/

— ADDED: Create loopback database link

create database link aqtest.LoopBack connect to aqtest identified by aqtest using

‘funk92.us.oracle.com’;

— ADDED: Setup scheduling for messages

begin DBMS_AQADM.Schedule_Propagation(Queue_Name => ‘MSG_QUEUE’,

Destination => ‘aqtest.LoopBack’,

Start_Time => sysdate,

Latency => 0);

end;

/

— Check scheduling: Everything checked out OK.

select * from user_queue_schedules;

— SMD: this msgs is meant for the MSG_QUEUEX queue and WILL BE propagated.

begin enqueue_msg(‘This should be propagated.’, ‘aqtest.MSG_QUEUEX@aqtest.LoopBack’); commit; end;

/

— Check scheduling: Neither error nor action reported

select * from user_queue_schedules;

———————————-

I think this is what you’re looking for:

aqtest@FUNK92> select t1.cnt, t2.cnt

2 from (select count(*) cnt from objmsgs80_qtab) t1,

3 (select count(*) cnt from objmsgs80_qtabx) t2

4 /

CNT CNT

———- ———-

1 1

Hope that helps!

Bottom of Form

advanced queues

Author: admin