Skip Headers

Oracle9i Application Developer's Guide - Advanced Queuing
Release 2 (9.2)

Part Number A96587-01
Go To Documentation Library
Home
Go To Product List
Book List
Go To Table Of Contents
Contents
Go To Index
Index

Master Index

Feedback

Go to previous page Go to next page

8
A Sample Application Using AQ

In Chapter 1, "Introduction to Oracle Advanced Queuing" a messaging system for a hypothetical company, BooksOnLine, was described. In this chapter the features of AQ in the BooksOnLine sample application are discussed under the following headings:

A Sample Application

The operations of a large bookseller, BooksOnLine, are based on an online book ordering system that automates activities across the various departments involved in the sale. The front end of the system is an order entry application used to enter new orders. Incoming orders are processed by an order processing application that validates and records the order. Shipping departments located at regional warehouses are responsible for ensuring that orders are shipped on time. There are three regional warehouses: one serving the East Region, one serving the West Region, and a third warehouse for shipping international orders. After an order is shipped, the order information is routed to a central billing department that handles payment processing.The customer service department, located at a separate site, is responsible for maintaining order status and handling inquiries.

The features of AQ are exemplified within the BooksOnLine scenario to demonstrate the possibilities of AQ technology. A script for the sample code is provided in Appendix C, "Scripts for Implementing BooksOnLine").

General Features of Advanced Queuing

In this section, the following topics are discussed:

System-Level Access Control

Oracle supports system-level access control for all queuing operations, allowing an application designer or DBA to designate users as queue administrators. A queue administrator can invoke AQ administrative and operational interfaces on any queue in the database. This simplifies the administrative work because all administrative scripts for the queues in a database can be managed under one schema. For more information, see "Oracle Enterprise Manager Support".

PL/SQL (DBMS_AQADM Package): Scenario and Code

In the BooksOnLine application, the DBA creates BOLADM, the BooksOnLine Administrator account, as the queue administrator of the database. This allows BOLADM to create, drop, manage, and monitor queues in the database. If PL/SQL packages are needed in the BOLADM schema for applications to enqueue and dequeue, the DBA should grant ENQUEUE_ANY and DEQUEUE_ANY system privileges to BOLADM:

CREATE USER BOLADM IDENTIFIED BY BOLADM; 
GRANT CONNECT, RESOURCE, aq_administrator_role TO BOLADM; 
GRANT EXECUTE ON dbms_aq TO BOLADM; 
GRANT EXECUTE ON dbms_aqadm TO BOLADM; 
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','BOLADM',FALSE); 
EXECUTE dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','BOLADM',FALSE); 

If using the Java AQ API, BOLADM must be granted execute privileges on the DBMS_AQIN package:

GRANT EXECUTE ON DBMS_AQIN to BOLADM; 
    

In the application, AQ propagators populate messages from the Order Entry(OE) schema to the Western Sales (WS), Eastern Sales (ES) and Worldwide Sales (OS) schemas. The WS, ES and OS schemas in turn populate messages to the Customer Billing (CB) and Customer Service (CS) schemas. Hence the OE, WS, ES and OS schemas all host queues that serve as the source queues for the propagators.

When messages arrive at the destination queues, sessions based on the source queue schema name are used for enqueuing the newly arrived messages into the destination queues. This means that you need to grant schemas of the source queues enqueue privileges to the destination queues.

To simplify administration, all schemas that host a source queue in the BoooksOnLine application are granted the ENQUEUE_ANY system privilege:

EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','OE',FALSE); 
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','WS',FALSE); 
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','ES',FALSE); 
EXECUTE dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','OS',FALSE);  
 

To propagate to a remote destination queue, the login user specified in the database link in the address field of the agent structure should either be granted the ENQUEUE ANY QUEUE privilege, or be granted the rights to enqueue to the destination queue. If the login user in the database link also owns the queue tables at the destination, no explicit privilege grant is needed.

Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.

Java (JDBC): Example Code

No example is provided with this release.

Queue-Level Access Control

Oracle supports queue-level access control for enqueue and dequeue operations. This feature allows the application designer to protect queues created in one schema from applications running in other schemas. The application designer needs to grant only minimal access privileges to the applications that run outside the queue schema. The supported access privileges on a queue are ENQUEUE, DEQUEUE and ALL. For more information, see "Oracle Enterprise Manager Support".

Scenario

The BooksOnLine application processes customer billings in its CB and CBADM schemas. CB (Customer Billing) schema hosts the customer billing application, and the CBADM schema hosts all related billing data stored as queue tables.

To protect the billing data, the billing application and the billing data reside in different schemas. The billing application is allowed only to dequeue messages from CBADM_shippedorders_que, the shipped order queue. It processes the messages, and then enqueues new messages into CBADM_billedorders_que, the billed order queue.

To protect the queues from other illegal operations from the application, the following two grant calls are needed:

PL/SQL (DBMS_AQADM Package): Example Code

/* Grant dequeue privilege on the shopped orders queue to the Customer 
 Billing application. The CB application retrieves orders that are shipped but 
 not billed from the shipped orders queue. */  
EXECUTE dbms_aqadm.grant_queue_privilege(
   'DEQUEUE','CBADM_shippedorders_que', 'CB', FALSE); 
 
/* Grant enqueue privilege on the billed orders queue to Customer Billing 
 application. The CB application is allowed to put billed orders into this 
 queue after processing the orders. */ 
 
EXECUTE dbms_aqadm.grant_queue_privilege(
   'ENQUEUE', 'CBADM_billedorders_que', 'CB', FALSE); 

Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.

Java (JDBC): Example Code

public static void grantQueuePrivileges(Connection db_conn)
{
    AQSession  aq_sess;
    AQQueue    sh_queue;
    AQQueue    bi_queue;

    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        /* Grant dequeue privilege on the shipped orders queue to the Customer 
           Billing application. The CB application retrieves orders that are 
           shipped but not billed from the shipped orders queue. */ 

        sh_queue = aq_sess.getQueue("CBADM", "CBADM_shippedorders_que");
        
        sh_queue.grantQueuePrivilege("DEQUEUE", "CB", false);
        
        /* Grant enqueue privilege on the billed orders queue to Customer 
           Billing application.The CB application is allowed to put billed 
           orders into this queue after processing the orders. */ 
 
        bi_queue = aq_sess.getQueue("CBADM", "CBADM_billedorders_que");
    
        bi_queue.grantQueuePrivilege("ENQUEUE", "CB", false);
    }
    catch (AQException ex)
    {
        System.out.println("AQ Exception: " + ex); 
    }

}

Message Format Transformation

You can define transformation mappings between different message payload types. Transformation mappings are defined as SQL expressions that can include PL/SQL functions (including callouts) and Java stored procedures. Only one-to-one message transformations are supported. The transformation engine is tightly integrated with Advanced Queuing to facilitate transformation of messages as they move through the database messaging system. Figure 8-1 shows how transformations are integrated with Advanced Queuing.

Figure 8-1 Transformations Integrated with Advanced Queuing

Text description of adque432.gif follows
Text description of the illustration adque432.gif


Transformation mappings can be used during enqueue, dequeue, and propagation operations. To use a transformation at enqueue, the mapping is specified in the enqueue options. To use a transformation at dequeue, the mapping is specified either in the dequeue options or when you add a subscriber. A mapping specified in the dequeue options overrides a mapping specified with ADD_SUBSCRIBER. To use a transformation at propagation, the mapping is specified when you add a subscriber.

PL/SQL (DBMS_TRANSFORM Package): Scenario and Code

In the BooksOnLine application, assume that the order type is represented differently in the order entry and the shipping applications.

The order type of the Order Entry application (in schema OE) is as follows:

create or replace type order_typ as object (
        orderno         number,
        status          varchar2(30),
        ordertype       varchar2(30),
        orderregion     varchar2(30),
        custno          number,
        paymentmethod   varchar2(30),
        items           orderitemlist_vartyp,
        ccnumber        varchar2(20),
        order_date      date);

create or replace type customer_typ as object (
        custno          number,
        custid          varchar2(20),
        name            varchar2(100),
        street          varchar2(100),
        city            varchar2(30),
        state           varchar2(2),
        zip             number,
        country         varchar2(100));

create or replace type book_typ as object (
        title           varchar2(100),
        authors         varchar2(100),
        ISBN            varchar2(20),
        price           number);

create or replace type orderitem_typ as object (
        quantity        number,
        item            book_typ,
        subtotal        number);

create or replace type orderitemlist_vartyp as varray (20) of
orderitem_typ;

The order item of the shipping application is defined as follows

create or replace type order_typ_sh as object (
        orderno         number,
        status          varchar2(30),
        ordertype       varchar2(30),
        orderregion     varchar2(30),
        customer        customer_typ_sh,
        paymentmethod   varchar2(30),
        items           orderitemlist_vartyp,
        ccnumber        varchar2(20),
        order_date      date);

create or replace type customer_typ_sh as object (
        custno          number,
        name            varchar2(100),
        street          varchar2(100),
        city            varchar2(30),
        state           varchar2(2),
        zip             number);

create or replace type book_typ_sh as object (
        title           varchar2(100),
        authors         varchar2(100),
        ISBN            varchar2(20),
        price           number);

create or replace type orderitem_typ_sh as object (
        quantity        number,
        item            book_typ,
        subtotal        number);

create or replace type orderitemlist_vartyp_sh as varray (20) of
orderitem_typ_sh;

The Overseas Shipping application uses a sys.XMLType attribute.

Creating Transformations

You can create transformations in the following ways:

Visual Basic (OO4O): Example Code

No example is provided with this release.

Java (JDBC): Example Code

No example is provided with this release.

Structured Payloads

With Oracle AQ, you can use object types to structure and manage the payload of messages. The object-relational capabilities of Oracle provide a rich set of data types that range from traditional relational data types to user-defined types.

Using strongly typed content, that is, content whose format is defined by an Oracle object type system, makes the following features available:

You can also create payloads that contain Oracle objects with XMLType attributes. These can be used for transmitting and storing messages that contain XML documents. By defining Oracle objects with XMLType attributes, you can do the following:

PL/SQL (DBMS_AQADM Package): Scenario and Code

The BooksOnLine application uses a rich set of data types to model book orders as message content.

Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.

Java (JDBC): Example Code

After creating the types, use JPublisher to generate Java classes that map to the SQL types.

  1. Create an input file "jaqbol.typ" for JPublisher with the following lines:
    TYPE boladm.customer_typ as Customer
    TYPE boladm.book_typ as Book
    TYPE boladm.orderitem_typ AS OrderItem
    TYPE boladm.orderitemlist_vartyp AS OrderItemList
    TYPE boladm.order_typ AS Order
    
    
  2. Run JPublisher with the following arguments:
    jpub -input=jaqbol.typ -user=boladm/boladm -case=mixed -methods=false 
    -compatible=CustomDatum 
    
    

This will create Java classes Customer, Book, OrderItem and OrderItemList that map to the SQL object types created earlier:

  1. Load the Java AQ driver and create a JDBC connection:
       public static Connection loadDriver(String user, String passwd) 
       {
          Connection db_conn = null;
          try 
          {
                Class.forName("oracle.jdbc.driver.OracleDriver");
    
             /* your actual hostname, port number, and SID will 
             vary from what follows. Here we use 'dlsun736,' '5521,'
             and 'test,' respectively: */
    
             db_conn =
                      DriverManager.getConnection(
                      "jdbc:oracle:thin:@dlsun736:5521:test", 
                      user, passwd);
    
             System.out.println("JDBC Connection opened "); 
             db_conn.setAutoCommit(false);
                     
             /* Load the Oracle8i AQ driver: */
             Class.forName("oracle.AQ.AQOracleDriver");
    
             System.out.println("Successfully loaded AQ driver ");  
          }
          catch (Exception ex)
          {
             System.out.println("Exception: " + ex); 
             ex.printStackTrace();      
          }  
          return db_conn;
    

XMLType Queue Payloads

You can create queues with XMLType payloads. These can be used for transmitting and storing messages that contain XML documents. By defining Oracle objects with XMLType attributes, you can do the following:

In the BooksOnline application, assume that the Overseas Shipping site represents the order as SYS.XMLType. The Order Entry site represents the order as an Oracle object, ORDER_TYP.

The Overseas queue table and queue are created as follows:

BEGIN
dbms_aqadm.create_queue_table(                          
   queue_table        => 'OS_orders_pr_mqtab',            
   comment            => 'Overseas Shipping MultiConsumer Orders queue table', 
   multiple_consumers => TRUE,                             
   queue_payload_type => 'SYS.XMLTtype',                       
   compatible         => '8.1');
END;

BEGIN
dbms_aqadm.create_queue (                                   
   queue_name   => 'OS_bookedorders_que',        
   queue_table  => 'OS_orders_pr_mqtab');
END;

Since the representation of orders at the Overseas Shipping site is different from the representation of orders at the Order Entry site, a transformation is applied before messages are propagated from the Order Entry site to the Overseas Shipping site.

/*  Add a rule-based subscriber (for Overseas Shipping) to the Booked orders 
queues with Transformation. Overseas Shipping handles all non-US orders: */ 
DECLARE 
  subscriber     aq$_agent; 
BEGIN 
  subscriber := aq$_agent('Overseas_Shipping','OS.OS_bookedorders_que',null); 

  dbms_aqadm.add_subscriber( 
   queue_name     => 'OE.OE_bookedorders_que', 
   subscriber     => subscriber, 
   rule           => 'tab.user_data.orderregion = ''INTERNATIONAL''',
   transformation => 'OS.OE2XML'); 
END; 

For more details on defining transformations that convert the type used by the Order Entry application to the type used by Overseas shipping, see "Creating Transformations".

Assume that an application processes orders for customers in Canada. This application can dequeue messages using the following procedure:

/*  Create procedures to enqueue into single-consumer queues: */ 
create or replace procedure get_canada_orders() as 
deq_msgid                RAW(16); 
dopt                     dbms_aq.dequeue_options_t; 
mprop                    dbms_aq.message_properties_t; 
deq_order_data           SYS.XMLTtype; 
no_messages              exception; 
pragma exception_init    (no_messages, -25228); 
new_orders               BOOLEAN := TRUE; 
  
begin 
        dopt.wait := 1;

/* Specify dequeue condition to select Orders for Canada */ 
        dopt.deq_condition := 'tab.user_data.extract(
''/ORDER_TYP/CUSTOMER/COUNTRY/text()'').getStringVal()=''CANADA'''; dopt.consumer_name : = 'Overseas_Shipping'; WHILE (new_orders) LOOP BEGIN dbms_aq.dequeue( queue_name => 'OS.OS_bookedorders_que', dequeue_options => dopt, message_properties => mprop, payload => deq_order_data, msgid => deq_msgid); commit; dbms_output.put_line(' Order for Canada - Order: ' || deq_order_data.getStringVal()); EXCEPTION WHEN no_messages THEN dbms_output.put_line (' ---- NO MORE ORDERS ---- '); new_orders := FALSE; END; END LOOP; end;

Nonpersistent Queues

A message in a nonpersistent queue is not stored in a database table. You create a nonpersistent queue, which can be either a single-consumer or multiconsumer type. These queues are created in a system-created queue table (AQ$_MEM_SC for single-consumer queues and AQ$_MEM_MC for multiconsumer queues) in the schema specified by the create_np_queue command. Subscribers can be added to the multiconsumer queues (see "Creating a Nonpersistent Queue"). Nonpersistent queues can be destinations for propagation.

You use the enqueue interface to enqueue messages into a nonpersistent queue in the normal way. You can enqueue RAW and Object Type (ADT) messages into a nonpersistent queue. You retrieve messages from a nonpersistent queue through the asynchronous notification mechanism, registering for the notification (using LNOCISubcriptionRegister or DBMS_AQADM.REGISTER) for the queues you are interested in (see "Registering for Notification").

When a message is enqueued into a queue, it is delivered to clients with active registrations for the queue. The messages are published to the interested clients without incurring the overhead of storing them in the database.


See Also:

Documentation on DBMS_AQADM.REGISTER in Oracle9i Supplied PL/SQL Packages and Types Reference and documentation on LNOCISubscriptionRegister in Oracle Call Interface Programmer's Guide.


Scenario

Assume that there are three application processes servicing user requests at the Order Entry system. The connection dispatcher shares out connection requests from the application processes. It attempts to maintain a count of the number of users logged on to the Order Entry system and the number of users for each application process. The application processes are named APP_1, APP_2, APP_3. (Application process failures are not considered in this example.)

Using nonpersistent queues meets the requirements in this scenario. When a user logs on to the database, the application process enqueues to the multiconsumer nonpersistent queue, LOGIN_LOGOUT, with the application name as the consumer name. The same process occurs when a user logs out. To distinguish between the two events, the correlation of the message is LOGIN for logins and LOGOUT for logouts.

The callback function counts the login/logout events for each application process. Note that the dispatcher process needs to connect to the database for registering the subscriptions only. The notifications themselves can be received while the process is disconnected from the database.

PL/SQL (DBMS_AQADM Package): Example Code

CONNECT oe/oe; 
/* Create the Object Type/ADT adtmsg */
CREATE OR REPLACE TYPE adtmsg AS OBJECT (id NUMBER, data VARCHAR2(4000));

/* Create the multiconsumer nonpersistent queue in OE schema: */ 
EXECUTE dbms_aqadm.create_np_queue(queue_name         => 'LOGON_LOGOFF', 
                                   multiple_consumers => TRUE);                   
 
/* Enable the queue for enqueue and dequeue: */
EXECUTE dbms_aqadm.start_queue(queue_name => 'LOGON_LOGOFF'); 
 
/* Nonpersistent Queue Scenario - procedure to be executed upon logon: */ 
CREATE OR REPLACE PROCEDURE  User_Logon(app_process IN VARCHAR2)  
AS 
  msgprop        dbms_aq.message_properties_t; 
  enqopt         dbms_aq.enqueue_options_t; 
  enq_msgid      RAW(16); 
  payload        RAW(1); 
BEGIN 
  /* visibility must always be immediate for NonPersistent queues */ 
  enqopt.visibility:=dbms_aq.IMMEDIATE; 
  msgprop.correlation:= 'LOGON'; 
  msgprop.recipient_list(0) := aq$_agent(app_process, NULL, NULL); 
  /* payload is NULL */ 
  dbms_aq.enqueue( 
        queue_name         => 'LOGON_LOGOFF', 
        enqueue_options    => enqopt, 
        message_properties => msgprop, 
        payload            => payload, 
        msgid              => enq_msgid);  
 
END; 
 
/* Nonpersistent queue scenario - procedure to be executed upon logoff: */ 
CREATE OR REPLACE PROCEDURE  User_Logoff(app_process IN VARCHAR2) 
AS 
  msgprop        dbms_aq.message_properties_t; 
  enqopt         dbms_aq.enqueue_options_t; 
  enq_msgid      RAW(16); 
  payload        adtmsg; 
BEGIN 
 /* Visibility must always be immediate for NonPersistent queues: */ 
  enqopt.visibility:=dbms_aq.IMMEDIATE; 
  msgprop.correlation:= 'LOGOFF'; 
  msgprop.recipient_list(0) := aq$_agent(app_process, NULL, NULL); 
 /* Payload is NOT NULL: */ 
payload := adtmsg(1, 'Logging Off');  

dbms_aq.enqueue( 
        queue_name         => 'LOGON_LOGOFF', 
        enqueue_options    => enqopt, 
        message_properties => msgprop, 
        payload            => payload, 
        msgid              => enq_msgid);  
 END; 
/ 
 
  
/* If there is a login at APP1, enqueue a message into 'login_logoff' with 
 correlation 'LOGIN': */ 
EXECUTE User_logon('APP1'); 
 
/* If there is a logout at APP3, enqueue a message into 'login_logoff' with 
   correlation 'LOGOFF' and payload adtmsg(1, 'Logging Off'): */ 
EXECUTE User_logoff('App3'); 
 
 /* The OCI program which waits for notifications: */ 
#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <oci.h> 
#ifdef WIN32COMMON 
#define sleep(x)   Sleep(1000*(x)) 
#endif 
 
/* LOGON / password:  */ 
static text *username = (text *) "OE"; 
static text *password = (text *) "OE"; 
 
/* The correlation strings of messages: */ 
static char  *logon = "LOGON"; 
static char  *logoff = "LOGOFF"; 
 
/* The possible consumer names of queues: */ 
static char *applist[] = {"APP1", "APP2","APP3"}; 
 
static OCIEnv *envhp; 
static OCIServer *srvhp; 
static OCIError *errhp; 
static OCISvcCtx *svchp; 
 
static void checkerr(/*_ OCIError *errhp, sword status _*/); 
 
struct process_statistics 
{ 
  ub4  logon; 
  ub4  logoff; 
}; 
 
typedef struct process_statistics process_statistics; 
 
int main(/*_ int argc, char *argv[] _*/); 
  
/* Notify Callback: */ 
ub4 notifyCB(ctx, subscrhp, pay, payl, desc, mode) 
dvoid *ctx; 
LNOCISubscription *subscrhp; 
dvoid *pay; 
ub4    payl; 
dvoid *desc; 
ub4    mode; 
{ 
 text                *subname;   /* subscription name */ 
 ub4                  lsub;      /* length of subscription name */ 
 text                *queue;    /* queue name */ 
 ub4                 *lqueue;    /* queue name */ 
 text                *consumer;  /* consumer name */ 
 ub4                  lconsumer;   
 text                *correlation; 
 ub4                  lcorrelation; 
 ub4                  size; 
 ub4                  appno; 
 OCIRaw              *msgid;               
 OCIAQMsgProperties  *msgprop;   /* message properties descriptor */ 
 process_statistics   *user_count = (process_statistics *)ctx; 
 
 OCIAttrGet((dvoid *)subscrhp, OCI_HTYPE_SUBSCRIPTION, 
                             (dvoid *)&subname, &lsub, 
                             OCI_ATTR_SUBSCR_NAME, errhp); 
 
 /* Extract the attributes from the AQ descriptor: */ 
 /* Queue name: */ 
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&queue, &size,  
            OCI_ATTR_QUEUE_NAME, errhp); 
   
 /* Consumer name: */ 
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&consumer, &lconsumer,  
            OCI_ATTR_CONSUMER_NAME, errhp); 
 
 /* Message properties: */ 
 OCIAttrGet(desc, OCI_DTYPE_AQNFY_DESCRIPTOR, (dvoid *)&msgprop, &size,  
            OCI_ATTR_MSG_PROP, errhp); 
 
 /* Get correlation from message properties: */ 
  checkerr(errhp, OCIAttrGet(msgprop, OCI_DTYPE_AQMSG_PROPERTIES,  
                             (dvoid *)&correlation, &lcorrelation,  
                             OCI_ATTR_CORRELATION, errhp)); 
   
  if (lconsumer == strlen(applist[0])) 
  { 
    if (!memcmp((dvoid *)consumer, (dvoid *)applist[0], strlen(applist[0]))) 
     appno = 0; 
    else if (!memcmp((dvoid *)consumer, (dvoid *)applist[1], 
strlen(applist[1]))) 
     appno = 1; 
    else if (!memcmp((dvoid *)consumer, (dvoid *)applist[2], 
strlen(applist[2]))) 
     appno = 2; 
    else  
    { 
     printf("Wrong consumer in notification"); 
     return; 
    } 
  } 
  else 
  {  /* consumer name must be "APP1", "APP2" or "APP3"  */ 
    printf("Wrong consumer in notification");   
    return; 
  } 
 
  if (lcorrelation == strlen(logon) &&                   /* logon event */ 
       !memcmp((dvoid *)correlation, (dvoid *)logon, strlen(logon))) 
  { 
     user_count[appno].logon++; 
                           /* increment logon count for the app process */     
         printf("Logon by APP%d \n", (appno+1));  
         printf("Logon Payload length = %d \n", pay1);  
   } 
  else if  (lcorrelation == strlen(logoff) &&           /* logoff event */ 
       !memcmp((dvoid *)correlation,(dvoid *)logoff, strlen(logoff))) 
  { 
     user_count[appno].logoff++;  
                         /* increment logoff count for the app process */ 
     printf("Logoff by APP%d \n", (appno+1));  
     printf("Logoff Payload length = %d \n", pay1);  
  }  
  else                           /* correlation is "LOGON" or "LOGOFF" */ 
    printf("Wrong correlation in notification");   
 
  printf("Total  : \n"); 
 
  printf("App1 : %d \n", user_count[0].logon-user_count[0].logoff); 
  printf("App2 : %d \n", user_count[1].logon-user_count[1].logoff); 
  printf("App3 : %d \n", user_count[2].logon-user_count[2].logoff); 
 
} 
 
int main(argc, argv) 
int argc; 
char *argv[]; 
{ 
  OCISession *authp = (OCISession *) 0; 
  OCISubscription *subscrhp[3]; 
  ub4 namespace = OCI_SUBSCR_NAMESPACE_AQ; 
  process_statistics  ctx[3] = {{0,0}, {0,0}, {0,0}}; 
  ub4 sleep_time = 0; 
 
  printf("Initializing OCI Process\n"); 
 
 /* Initialize OCI environment with OCI_EVENTS flag set: */ 
  (void) OCIInitialize((ub4) OCI_EVENTS|OCI_OBJECT, (dvoid *)0, 
                       (dvoid * (*)(dvoid *, size_t)) 0, 
                       (dvoid * (*)(dvoid *, dvoid *, size_t))0, 
                       (void (*)(dvoid *, dvoid *)) 0 ); 
 
  printf("Initialization successful\n"); 
 
  printf("Initializing OCI Env\n"); 
  (void) OCIEnvInit( (OCIEnv **) &envhp, OCI_DEFAULT, (size_t) 0, (dvoid **) 0 
); 
  printf("Initialization successful\n"); 
 
  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &errhp, 
LNOCI_HTYPE_ERROR,  
                   (size_t) 0, (dvoid **) 0)); 
 
  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &srvhp, 
LNOCI_HTYPE_SERVER, 
                   (size_t) 0, (dvoid **) 0)); 
 
  checkerr(errhp, OCIHandleAlloc( (dvoid *) envhp, (dvoid **) &svchp, 
LNOCI_HTYPE_SVCCTX, 
                   (size_t) 0, (dvoid **) 0)); 
 
  printf("connecting to server\n"); 
  checkerr(errhp, OCIServerAttach( srvhp, errhp, (text *)"inst1_alias", 
           strlen("inst1_alias"), (ub4) OCI_DEFAULT)); 
  printf("connect successful\n"); 
 
 /* Set attribute server context in the service context: */ 
  checkerr(errhp, OCIAttrSet( (dvoid *) svchp, OCI_HTYPE_SVCCTX, (dvoid *)srvhp,  
                    (ub4) 0, OCI_ATTR_SERVER, (OCIError *) errhp)); 
 
  checkerr(errhp, OCIHandleAlloc((dvoid *) envhp, (dvoid **)&authp, 
                       (ub4) OCI_HTYPE_SESSION, (size_t) 0, (dvoid **) 0)); 
  
 /* Set username and password in the session handle: */ 
  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, 
                  (dvoid *) username, (ub4) strlen((char *)username), 
                  (ub4) OCI_ATTR_USERNAME, errhp)); 
  
  checkerr(errhp, OCIAttrSet((dvoid *) authp, (ub4) OCI_HTYPE_SESSION, 
                  (dvoid *) password, (ub4) strlen((char *)password), 
                  (ub4) OCI_ATTR_PASSWORD, errhp)); 
 
 /* Begin session: */ 
  checkerr(errhp, OCISessionBegin ( svchp,  errhp, authp, OCI_CRED_RDBMS,  
                          (ub4) OCI_DEFAULT)); 
 
  (void) OCIAttrSet((dvoid *) svchp, (ub4) OCI_HTYPE_SVCCTX, 
                   (dvoid *) authp, (ub4) 0, 
                   (ub4) OCI_ATTR_SESSION, errhp); 
 
 /* Register for notification: */ 
   printf("allocating subscription handle\n"); 
  subscrhp[0] = (OCISubscription *)0; 
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[0],  
                        (ub4) OCI_HTYPE_SUBSCRIPTION, 
                        (size_t) 0, (dvoid **) 0); 
  
 /* For application process APP1: */ 
  printf("setting subscription name\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) "OE.LOGON_LOGOFF:APP1",  
                 (ub4) strlen("OE.LOGON_LOGOFF:APP1"), 
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp); 
  
  printf("setting subscription callback\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) notifyCB, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); 
 
 (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *)&ctx, (ub4)sizeof(ctx), 
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp); 
 
  printf("setting subscription namespace\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[0], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) &namespace, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); 
 
 printf("allocating subscription handle\n"); 
  subscrhp[1] = (OCISubscription *)0; 
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[1],  
                        (ub4) OCI_HTYPE_SUBSCRIPTION, 
                        (size_t) 0, (dvoid **) 0); 
  
 /* For application process APP2: */ 
  printf("setting subscription name\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) "OE.LOGON_LOGOFF:APP2",  
                 (ub4) strlen("OE.LOGON_LOGOFF:APP2"), 
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp); 
  
  printf("setting subscription callback\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) notifyCB, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); 
 
 (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *)&ctx, (ub4)sizeof(ctx), 
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp); 
 
  printf("setting subscription namespace\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[1], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) &namespace, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); 
 
   printf("allocating subscription handle\n"); 
  subscrhp[2] = (OCISubscription *)0; 
  (void) OCIHandleAlloc((dvoid *) envhp, (dvoid **)&subscrhp[2],  
                        (ub4) OCI_HTYPE_SUBSCRIPTION, 
                        (size_t) 0, (dvoid **) 0); 
 
 /* For application process APP3: */ 
  printf("setting subscription name\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) "OE.LOGON_LOGOFF:APP3",  
                 (ub4) strlen("OE.LOGON_LOGOFF:APP3"), 
                 (ub4) OCI_ATTR_SUBSCR_NAME, errhp); 
  
  printf("setting subscription callback\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) notifyCB, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_CALLBACK, errhp); 
 
 (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *)&ctx, (ub4)sizeof(ctx), 
                 (ub4) OCI_ATTR_SUBSCR_CTX, errhp); 
 
  printf("setting subscription namespace\n"); 
  (void) OCIAttrSet((dvoid *) subscrhp[2], (ub4) OCI_HTYPE_SUBSCRIPTION, 
                 (dvoid *) &namespace, (ub4) 0, 
                 (ub4) OCI_ATTR_SUBSCR_NAMESPACE, errhp); 
 
  printf("Registering fornotifications \n"); 
  checkerr(errhp, OCISubscriptionRegister(svchp, subscrhp, 3, errhp,  
                                          OCI_DEFAULT)); 
 
  sleep_time = (ub4)atoi(argv[1]); 
  printf ("waiting for %d s \n", sleep_time); 
  sleep(sleep_time); 
 
  printf("Exiting"); 
  exit(0); 
} 
 
void checkerr(errhp, status) 
LNOCIError *errhp; 
sword status; 
{ 
  text errbuf[512]; 
  sb4 errcode = 0; 
 
  switch (status) 
  { 
  case OCI_SUCCESS: 
    break; 
  case OCI_SUCCESS_WITH_INFO: 
    (void) printf("Error - OCI_SUCCESS_WITH_INFO\n"); 
    break; 
  case OCI_NEED_DATA: 
    (void) printf("Error - OCI_NEED_DATA\n"); 
    break; 
  case OCI_NO_DATA: 
    (void) printf("Error - OCI_NODATA\n"); 
    break; 
  case OCI_ERROR: 
    (void) OCIErrorGet((dvoid *)errhp, (ub4) 1, (text *) NULL, &errcode, 
                        errbuf, (ub4) sizeof(errbuf), OCI_HTYPE_ERROR); 
    (void) printf("Error - %.*s\n", 512, errbuf); 
    break; 
  case OCI_INVALID_HANDLE: 
    (void) printf("Error - OCI_INVALID_HANDLE\n"); 
    break; 
  case OCI_STILL_EXECUTING: 
    (void) printf("Error - OCI_STILL_EXECUTE\n"); 
    break; 
  case OCI_CONTINUE: 
    (void) printf("Error - OCI_CONTINUE\n"); 
    break; 
  default: 
    break; 
  } 
} 
 
/* End of file tkaqdocn.c */ 

Visual Basic (OO4O): Example Code

This feature is not supported currently.

Java (JDBC): Example Code

This feature is not supported through the Java API.

Retention and Message History

Advanced Queuing allows the retention of the message history after consumption. The messages and their histories can be queried using SQL. This allows business analysis of the integrated system. In certain cases, messages need to be tracked. For example, if a message is produced as a result of the consumption of another message, the two are related. As the application designer, you may want to keep track of such relationships. Taken together, retention, message identifiers, and SQL queries make it possible to build powerful message warehouses.

Scenario

Assume that you need to determine the average order processing time. This includes the time the order has to wait in the backed_order queue. You want to know the average wait time in the backed_order queue. SQL queries can determine the wait time for orders in the shipping application. Specify the retention as TRUE for the shipping queues and specify the order number in the correlation field of the message.

For simplicity, only orders that have already been processed are analyzed. The processing time for an order in the shipping application is the difference between the enqueue time in the WS_bookedorders_que and the enqueue time in the WS_shipped_orders_que (see "tkaqdoca.sql: Script to Create Users, Objects, Queue Tables, Queues & Subscribers" of Appendix C, "Scripts for Implementing BooksOnLine".

PL/SQL (DBMS_AQADM Package): Example Code

SELECT  SUM(SO.enq_time - BO.enq_time) / count (*) AVG_PRCS_TIME 
   FROM WS.AQ$WS_orders_pr_mqtab BO , WS.AQ$WS_orders_mqtab SO  
   WHERE SO.msg_state = 'PROCESSED' and BO.msg_state = 'PROCESSED' 
   AND SO.corr_id = BO.corr_id and SO.queue = 'WS_shippedorders_que'; 
 
/* Average waiting time in the backed order queue: */ 
SELECT SUM(BACK.deq_time - BACK.enq_time)/count (*) AVG_BACK_TIME 
   FROM WS.AQ$WS_orders_mqtab BACK  
   WHERE BACK.msg_state = 'PROCESSED' AND BACK.queue = 'WS_backorders_que'; 

Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.

Java (JDBC): Example Code

No example is provided with this release.

Publish-Subscribe Support

Advanced Queuing supports the publish-subscribe model of application integration. In the model, publishing applications put the message in the queue. The subscribing applications subscribe to the message in the queue. More publishing and subscribing applications can be dynamically added without changing the existing publishing and subscribing applications. Advanced Queuing also supports content-based subscriptions. The subscriber can subscribe to a subset of messages in the queue based on the message properties and the contents of the messages. A subscriber to a queue can also be another queue or a consumer on another queue.

You can implement a publish-subscribe model of communication using Advanced Queuing as follows:

Scenario

The BooksOnLine application illustrates the use of a publish-subscribe model for communicating between applications. The following subsections give some examples.

Defining queues

The Order Entry application defines a queue (OE_booked_orders_que) to communicate orders that are booked to various applications. The Order Entry application is not aware of the various subscriber applications and thus, a new subscriber application can be added without disrupting any setup or logic in the Order Entry (publisher) application.

Setting Up Subscriptions

The various shipping applications and the customer service application (that is, Eastern region shipping, Western region shipping, Overseas shipping and Customer Service) are defined as subscribers to the booked_orders queue of the Order Entry application. Rules are used to route messages of interest to the various subscribers. Thus, Eastern Region shipping, which handles shipment of all orders for the East coast and all rush U.S. orders, expresses the subscription rule as follows:

rule  => 'tab.user_data.orderregion = ''EASTERN'' OR 
(tab.user_data.ordertype = ''RUSH'' AND  
tab.user_data.customer.country = ''USA'') ' 
 

Each subscriber can specify a local queue where messages are to be delivered. The Eastern region shipping application specifies a local queue (ES_booked_orders_que) for message delivery by specifying the subscriber address as follows:

subscriber := aq$_agent('East_Shipping', 'ES.ES_bookedorders_que', null); 
Setting Up Propagation

Enable propagation from each publisher application queue. To allow subscribed messages to be delivered to remote queues, the Order Entry application enables propagation by means of the following statement:

execute dbms_aqadm.schedule_propagation(queue_name => 'OE.OE_bookedorders_que');  
Publishing Messages

Booked orders are published by the Order Entry application when it enqueues orders (into the OE_booked_order_que) that have been validated and are ready for shipping. These messages are then routed to each of the subscribing applications. Messages are delivered to local queues (if specified) at each of the subscriber applications.

Receiving Messages

Each of the shipping applications and the Customer Service application will then receive these messages in their local queues. For example, Eastern Region Shipping only receives booked orders that are for East Coast addresses or any U.S. order that is marked RUSH. This application then dequeues messages and processes its orders for shipping.

Support for Oracle Real Application Clusters

Real Application Clusters can be used to improve AQ performance by allowing different queues to be managed by different instances. You do this by specifying different instance affinities (preferences) for the queue tables that store the queues. This allows queue operations (enqueue and dequeue) on different queues to occur in parallel.

The AQ queue monitor process continuously monitors the instance affinities of the queue tables. The queue monitor assigns ownership of a queue table to the specified primary instance if it is available, failing which it assigns it to the specified secondary instance.

If the owner instance of a queue table terminates, the queue monitor changes ownership to a suitable instance such as the secondary instance.

AQ propagation is able to make use of Real Application Clusters, although it is transparent to the user. The affinities for jobs submitted on behalf of the propagation schedules are set to the same values as that of the affinities of the respective queue tables. Thus a job_queue_process associated with the owner instance of a queue table will be handling the propagation from queues stored in that queue table, thereby minimizing pinging. Additional discussion on this topic can be found under AQ propagation scheduling (see "Scheduling a Queue Propagation" in Chapter 9, "Administrative Interface").


See also:

Oracle9i Real Application Clusters Setup and Configuration


Scenario

In the BooksOnLine example, operations on the new_orders_queue and booked_order_queue at the order entry (OE) site can be made faster if the two queues are associated with different instances. This is done by creating the queues in different queue tables and specifying different affinities for the queue tables in the create_queue_table() command.

In the example, the queue table OE_orders_sqtab stores queue new_orders_queue and the primary and secondary are instances 1 and 2 respectively. Queue table OE_orders_mqtab stores queue booked_order_queue and the primary and secondary are instances 2 and 1 respectively. The objective is to let instances 1 and 2 manage the two queues in parallel. By default, only one instance is available, in which case the owner instances of both queue tables will be set to instance 1. However, if Real Application Clusters are set up correctly and both instances 1 and 2 are available, then queue table OE_orders_sqtab will be owned by instance 1 and the other queue table will be owned by instance 2. The primary and secondary instance specification of a queue table can be changed dynamically using the alter_queue_table() command as shown in the following example. Information about the primary, secondary and owner instance of a queue table can be obtained by querying the view USER_QUEUE_TABLES (see "Selecting Queue Tables in User Schema" in "Administrative Interface: Views").


Note:

Queue names and queue table names are converted to upper case. Mixed case (upper and lower case together) is not supported for queue names and queue table names.


PL/SQL (DBMS_AQADM Package): Example Code

/* Create queue tables, queues for OE  */
CONNECT OE/OE; 
EXECUTE dbms_aqadm.create_queue_table( \
        queue_table        => 'OE_orders_sqtab',\
        comment            => 'Order Entry Single-Consumer Orders queue table',\
        queue_payload_type => 'BOLADM.order_typ',\
        compatible         => '8.1',\
        primary_instance   => 1,\
        secondary_instance => 2);
  
EXECUTE dbms_aqadm.create_queue_table(\
        queue_table        => 'OE_orders_mqtab',\
        comment            => 'Order Entry Multi Consumer Orders queue table',\
        multiple_consumers => TRUE,\
        queue_payload_type => 'BOLADM.order_typ',\
        compatible         => '8.1',\
        primary_instance   => 2,\
        secondary_instance => 1); 
  
EXECUTE dbms_aqadm.create_queue ( \
        queue_name         => 'OE_neworders_que',\
        queue_table        => 'OE_orders_sqtab'); 
  
EXECUTE dbms_aqadm.create_queue ( \
        queue_name         => 'OE_bookedorders_que',\
        queue_table        => 'OE_orders_mqtab'); 
  
/* Check instance affinity of OE queue tables from AQ administrative view: */ 
SELECT queue_table, primary_instance, secondary_instance, owner_instance 
FROM user_queue_tables; 
  
/* Alter instance affinity of OE queue tables: */ 
EXECUTE dbms_aqadm.alter_queue_table( \
        queue_table        => 'OE.OE_orders_sqtab',\
        primary_instance   => 2,\
        secondary_instance => 1); 
  
EXECUTE dbms_aqadm.alter_queue_table(  \
        queue_table        => 'OE.OE_orders_mqtab', \
        primary_instance   => 1,\
        secondary_instance => 2); 
  
/* Check instance affinity of OE queue tables from AQ administrative view: */
SELECT queue_table, primary_instance, secondary_instance, owner_instance 
FROM user_queue_tables; 

Visual Basic (OO4O): Example Code

This feature currently not supported.

Java (JDBC): Example Code

public static void createQueueTablesAndQueues(Connection db_conn)
{
    AQSession            aq_sess;
    AQQueueTableProperty sqt_prop;
    AQQueueTableProperty mqt_prop;
    AQQueueTable         sq_table;      
    AQQueueTable         mq_table;      
    AQQueueProperty      q_prop;
    AQQueue              neworders_q;   
    AQQueue              bookedorders_q;        

    try
    {
        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        /* Create a single-consumer orders queue table */
        sqt_prop = new AQQueueTableProperty("BOLADM.order_typ");
        sqt_prop.setComment("Order Entry Single-Consumer Orders queue table");
        sqt_prop.setCompatible("8.1");
        sqt_prop.setPrimaryInstance(1);
        sqt_prop.setSecondaryInstance(2);

        sq_table = aq_sess.createQueueTable("OE", "OE_orders_sqtab", sqt_prop);

        /* Create a multiconsumer orders queue table */
        mqt_prop = new AQQueueTableProperty("BOLADM.order_typ");
        mqt_prop.setComment("Order Entry Multi Consumer Orders queue table");
        mqt_prop.setCompatible("8.1");
        mqt_prop.setMultiConsumer(true);
        mqt_prop.setPrimaryInstance(2);
        mqt_prop.setSecondaryInstance(1);

        mq_table = aq_sess.createQueueTable("OE", "OE_orders_mqtab", mqt_prop);
        
    
        /* Create Queues in these queue tables */
        q_prop = new AQQueueProperty();

        neworders_q = aq_sess.createQueue(sq_table, "OE_neworders_que", 
                                          q_prop);
        
        bookedorders_q = aq_sess.createQueue(mq_table, "OE_bookedorders_que", 
                                             q_prop);
  
    }
    catch (AQException ex)
    {
        System.out.println("AQ Exception: " + ex); 
    }
}

public static void alterInstanceAffinity(Connection db_conn)
{
    AQSession            aq_sess;
    AQQueueTableProperty sqt_prop;
    AQQueueTableProperty mqt_prop;
    AQQueueTable         sq_table;      
    AQQueueTable         mq_table;      
    AQQueueProperty      q_prop;

    try
    {

        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        /* Check instance affinities */
        sq_table = aq_sess.getQueueTable("OE", "OE_orders_sqtab");

        sqt_prop = sq_table.getProperty();
        System.out.println("Current primary instance for OE_orders_sqtab: " + 
                           sqt_prop.getPrimaryInstance());

        mq_table = aq_sess.getQueueTable("OE", "OE_orders_mqtab");
        mqt_prop = mq_table.getProperty();
        System.out.println("Current primary instance for OE_orders_mqtab: " + 
                           mqt_prop.getPrimaryInstance());
    
        /* Alter queue table affinities */
        sq_table.alter(null, 2, 1);

        mq_table.alter(null, 1, 2);

        sqt_prop = sq_table.getProperty();
        System.out.println("Current primary instance for OE_orders_sqtab: " + 
                           sqt_prop.getPrimaryInstance());

        mq_table = aq_sess.getQueueTable("OE", "OE_orders_mqtab");
        mqt_prop = mq_table.getProperty();
        System.out.println("Current primary instance for OE_orders_mqtab: " + 
                           mqt_prop.getPrimaryInstance());
  
    }
    catch (AQException ex)
    {
        System.out.println("AQ Exception: " + ex); 
    }
}

Support for Statistics Views

Each instance keeps its own AQ statistics information in its own SGA, and does not have knowledge of the statistics gathered by other instances. When a GV$AQ view is queried by an instance, all other instances funnel their AQ statistics information to the instance issuing the query.

Scenario

The gv$ view can be queried at any time to see the number of messages in waiting, ready or expired state. The view also displays the average number of seconds messages have been waiting to be processed. The order processing application can use this to dynamically tune the number of order processing processes (see "Selecting the Number of Messages in Different States for the Whole Database" in Chapter 10, "Administrative Interface: Views").

PL/SQL (DBMS_AQADM Package): Example Code

CONNECT oe/oe 
 
/* Count the number as messages and the average time for which the messages have 
 been waiting: */ 
SELECT READY, AVERAGE_WAIT FROM gv$aq Stats, user_queues Qs 
  WHERE Stats.qid = Qs.qid and Qs.Name = 'OE_neworders_que'; 

Visual Basic (OO4O): Example Code

Use the dbexecutesql interface from the database for this functionality.

Java (JDBC): Example Code

No example is provided with this release.

Internet Access

See Chapter 17, "Internet Access to Advanced Queuing" for information on Internet access to Advanced Queuing features.

Enqueue Features

In this section, the following topics are discussed:

Subscriptions and Recipient Lists

After consumption by dequeue, messages are retained for the time specified in retention_time. When retention_time expires, messages are removed by the time manager process.

After processing, the message is removed if the retention_time of the queue is 0, or retained for the specified retention time. While the message is retained the message can either be queried using SQL on the queue table view or by dequeuing using the BROWSE mode and specifying the message ID of the processed message.

Advanced Queuing allows a single message to be processed and consumed by more than one consumer. To use this feature, you must create multiconsumer queues and enqueue the messages into these multiconsumer queues. Advanced Queuing allows two methods of identifying the list of consumers for a message: subscriptions and recipient lists.

Subscriptions

You can add a subscription to a queue by using the DBMS_AQADM.ADD_SUBSCRIBER PL/SQL procedure (see "Adding a Subscriber" in Chapter 9, "Administrative Interface"). This lets you specify a consumer by means of the AQ$_AGENT parameter for enqueued messages. You can add more subscribers by repeatedly using the DBMS_AQADM.ADD_SUBSCRIBER procedure up to a maximum of 1024 subscribers for a multiconsumer queue.

All consumers that are added as subscribers to a multiconsumer queue must have unique values for the AQ$_AGENT parameter. This means that two subscribers cannot have the same values for the NAME, ADDRESS and PROTOCOL attributes for the AQ$_AGENT type. At least one of the three attributes must be different for two subscribers (see "Agent Type (aq$_agent)" for formal description of this data structure).

You cannot add subscriptions to single-consumer queues or exception queues. A consumer that is added as a subscriber to a queue will only be able to dequeue messages that are enqueued after the DBMS_AQADM.ADD_SUBSCRIBER procedure is completed. In other words, messages that had been enqueued before this procedure is executed will not be available for dequeue by this consumer.

You can remove a subscription by using the DBMS_AQADM.REMOVE_SUBSCRIBER procedure (see "Removing a Subscriber" in Chapter 9, "Administrative Interface"). AQ will automatically remove from the queue all data corresponding to the consumer identified by the AQ$_AGENT parameter. In other words, it is not an error to execute the REMOVE_SUBSCRIBER procedure even when there are pending messages that are available for dequeue by the consumer. These messages will be automatically made unavailable for dequeue after the REMOVE_SUBSCRIBER procedure is executed. In a queue table that is created with the compatible parameter set to '8.1' or higher, such messages that were not dequeued by the consumer will be shown as "UNDELIVERABLE" in the AQ$<queue_table> view. Note that a multiconsumer queue table created without the compatible parameter, or with the compatible parameter set to '8.0', does not display the state of a message on a consumer basis, but only displays the global state of the message.

Recipient Lists

You do not need to specify subscriptions for a multiconsumer queue if the producers of messages for enqueue supply a recipient list of consumers. In some situations it may be desirable to enqueue a message that is targeted to a specific set of consumers rather than the default list of subscribers. You accomplish this by specifying a recipient list at the time of enqueuing the message.

If a recipient list is specified during enqueue, it overrides the subscription list. In other words, messages that have a specified recipient list will not be available for dequeue by the subscribers of the queue. The consumers specified in the recipient list may or may not be subscribers for the queue. It is an error if the queue does not have any subscribers and the enqueue does not specify a recipient list (see "Enqueuing a Message" in Chapter 11, "Operational Interface: Basic Operations").

Priority and Ordering of Messages

The message ordering dictates the order that messages are dequeued from a queue. The ordering method for a queue is specified when a queue table is created (see "Creating a Queue Table" in Chapter 9, "Administrative Interface").

Priority ordering of messages is achieved by specifying priority, enqueue time as the sort order for the message. If priority ordering is chosen, each message will be assigned a priority at enqueue time by the enqueuer. At dequeue time, the messages will be dequeued in the order of the priorities assigned. If two messages have the same priority, the order in which they are dequeued is determined by the enqueue time. A first-in, first-out (FIFO) priority queue can also be created by specifying the enqueue time, priority as the sort order of the messages.

Scenario

In the BooksOnLine application, a customer can request:

The Order Entry application uses a priority queue to store booked orders. Booked orders are propagated to the regional booked orders queues. At each region, orders in these regional booked orders queues are processed in the order of the shipping priorities.

The following calls create the priority queues for the Order Entry application.

PL/SQL (DBMS_AQADM Package): Example Code

/* Create a priority queue table for OE: */ 
EXECUTE dbms_aqadm.create_queue_table( \                        
   queue_table         => 'OE_orders_pr_mqtab', \          
   sort_list           =>'priority,enq_time', \ 
   comment             => 'Order Entry Priority  \
                          MultiConsumer Orders queue table',\ 
   multiple_consumers  => TRUE, \                           
   queue_payload_type  => 'BOLADM.order_typ', \                      
   compatible          => '8.1', \                                  
   primary_instance    => 2, \                                       
   secondary_instance  => 1); 
 
EXECUTE dbms_aqadm.create_queue ( \                                
   queue_name          => 'OE_bookedorders_que', \       
   queue_table         => 'OE_orders_pr_mqtab'); 
 
/* When an order arrives, the order entry application can use the following 
 procedure to enqueue the order into its booked orders queue. A shipping 
 priority is specified for each order: */
CREATE OR REPLACE procedure order_enq(book_title        IN VARCHAR2, 
                                      book_qty          IN NUMBER, 
                                      order_num         IN NUMBER, 
                                      shipping_priority IN NUMBER, 
                                      cust_state        IN VARCHAR2, 
                                      cust_country      IN VARCHAR2, 
                                      cust_region       IN VARCHAR2, 
                                      cust_ord_typ      IN VARCHAR2) AS 
 
OE_enq_order_data        BOLADM.order_typ; 
OE_enq_cust_data         BOLADM.customer_typ; 
OE_enq_book_data         BOLADM.book_typ; 
OE_enq_item_data         BOLADM.orderitem_typ; 
OE_enq_item_list         BOLADM.orderitemlist_vartyp; 
enqopt                   dbms_aq.enqueue_options_t; 
msgprop                  dbms_aq.message_properties_t; 
enq_msgid                RAW(16); 
  
BEGIN 
   msgprop.correlation := cust_ord_typ; 
   OE_enq_cust_data    := BOLADM.customer_typ(NULL, NULL, NULL, NULL, 
                                cust_state, NULL, cust_country); 
   OE_enq_book_data    := BOLADM.book_typ(book_title, NULL, NULL, NULL); 
   OE_enq_item_data    := BOLADM.orderitem_typ(book_qty,  
                                OE_enq_book_data, NULL); 
   OE_enq_item_list    := BOLADM.orderitemlist_vartyp( 
                                BOLADM.orderitem_typ(book_qty,  
                                OE_enq_book_data, NULL)); 
   OE_enq_order_data   := BOLADM.order_typ(order_num, NULL,  
                                cust_ord_typ, cust_region, 
                                OE_enq_cust_data, NULL,  
                                OE_enq_item_list, NULL); 
 
   /*Put the shipping priority into message property before enqueuing 
     the message: */
   msgprop.priority    := shipping_priority; 
   dbms_aq.enqueue('OE.OE_bookedorders_que', enqopt, msgprop,  
                        OE_enq_order_data, enq_msgid); 
        COMMIT; 
  END; 
  / 
 
 
/* At each region, similar booked order queues are created. The orders are    
 propagated from the central Order Entry's booked order queues to the regional 
 booked order queues.For example, at the western region, the booked orders   
 queue is created. 
 Create a priority queue table for WS shipping: */
EXECUTE dbms_aqadm.create_queue_table( \                       
   queue_table        =>  'WS_orders_pr_mqtab',           
   sort_list          =>'  priority,enq_time',  \
   comment            =>  'West Shipping Priority  \
                           MultiConsumer Orders queue table',\ 
   multiple_consumers => TRUE, \                           
   queue_payload_type => 'BOLADM.order_typ', \                     
   compatible         => '8.1'); 
 
/* Booked orders are stored in the priority queue table: */ 
EXECUTE dbms_aqadm.create_queue ( \                                 
   queue_name         => 'WS_bookedorders_que', \        
   queue_table        => 'WS_orders_pr_mqtab'); 
 
/* At each region, the shipping application dequeues orders from the regional 
 booked order queue according to the orders' shipping priorities, processes 
 the orders, and enqueues the processed orders into the shipped orders queues 
 or the back orders queues. */

Visual Basic (OO4O): Example Code

Dim OraSession as object
Dim OraDatabase as object
Dim OraAq as object
Dim OraMsg as Object
Dim OraOrder,OraCust,OraBook,OraItem,OraItemList as Object
Dim Msgid as String

   Set OraSession = CreateObject("OracleInProcServer.XOraSession")
   Set OraDatabase = OraSession.DbOpenDatabase("dbname", "user/pwd", 0&)
   set oraaq = OraDatabase.CreateAQ("OE.OE_bookedorders_que")
   Set OraMsg = OraAq.AQMsg(ORATYPE_OBJECT, "BOLADM.order_typ")
   Set OraOrder = OraDatabase.CreateOraObject("BOLADM.order_typ")
   Set OraCust = OraDatabase.CreateOraObject("BOLADM.Customer_typ")
   Set OraBook = OraDatabase.CreateOraObject("BOLADM.book_typ")
   Set OraItem = OraDatabase.CreateOraObject("BOLADM.orderitem_typ")
   Set OraItemList = OraDatabase.CreateOraObject("BOLADM.orderitemlist_vartyp")

   ' Get the values of cust_state,cust_country etc from user(form_based
   ' input) and then a cmd_click event for Enqueue  
   ' will execute the subroutine order_enq.
   Private Sub Order_enq()
   
   OraMsg.correlation = txt_correlation
   'Initialize the customer details 
        OraCust("state") = txt_cust_state
   OraCust("country") = txt_cust_country
        OraBook("title") = txt_book_title
   OraItem("quantity") = txt_book_qty
   OraItem("item") = OraBook
   OraItemList(1) = OraItem
   OraOrder("orderno") = txt_order_num
   OraOrder("ordertype") = txt_cust_order_typ
   OraOrder("orderregion") = cust_region
   OraOrder("customer") = OraCust
   OraOrder("items") = OraItemList
        
   'Put the shipping priority into message property before enqueuing 
   '  the message: 
   OraMsg.priority = priority
   OraMsg = OraOrder
   Msgid = OraAq.enqueue

   'Release all allocations 
   End Sub

Java (JDBC): Example Code

public static void createPriorityQueueTable(Connection db_conn)
{
    AQSession            aq_sess;
    AQQueueTableProperty mqt_prop;
    AQQueueTable         pr_mq_table;   
    AQQueueProperty      q_prop;
    AQQueue              bookedorders_q;        

    try
    {

        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        /* Create a priority queue table for OE */
        mqt_prop = new AQQueueTableProperty("BOLADM.order_typ");
        mqt_prop.setComment("Order Entry Priority " + 
                            "MultiConsumer Orders queue table");
        mqt_prop.setCompatible("8.1");
        mqt_prop.setMultiConsumer(true);

        mqt_prop.setSortOrder("priority,enq_time");

        pr_mq_table = aq_sess.createQueueTable("OE", "OE_orders_pr_mqtab", 
                                            mqt_prop);
     
        /* Create a Queue in this queue table */
        q_prop = new AQQueueProperty();
        
        bookedorders_q = aq_sess.createQueue(pr_mq_table, 
                                             "OE_bookedorders_que", q_prop);
        
        /* Enable enqueue and dequeue on the queue */
        bookedorders_q.start(true, true);
  
    }
    catch (AQException ex)
    {
        System.out.println("AQ Exception: " + ex); 
    }
}

 
/* When an order arrives, the order entry application can use the following 
   procedure to enqueue the order into its booked orders queue. A shipping 
   priority is specified for each order 
 */
public static void order_enqueue(Connection db_conn, String book_title,
                                 double book_qty, double order_num, 
                                 int ship_priority, String cust_state,
                                 String cust_country, String cust_region,
                                 String cust_order_type)
{
    AQSession         aq_sess;
    AQQueue           bookedorders_q;   
    Order             enq_order;
    Customer          cust_data;
    Book              book_data;
    OrderItem         item_data;
    OrderItem[]       items;
    OrderItemList     item_list;
    AQEnqueueOption   enq_option;
    AQMessageProperty m_property;
    AQMessage         message;
    AQObjectPayload   obj_payload;
    byte[]            enq_msg_id;

    try
    {

        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);

        cust_data = new Customer();
        cust_data.setCountry(cust_country);
        cust_data.setState(cust_state);
  
        book_data = new Book();
        book_data.setTitle(book_title);

        item_data = new OrderItem();
        item_data.setQuantity(new BigDecimal(book_qty));
        item_data.setItem(book_data);

        items = new OrderItem[1];
        items[0] = item_data;

        item_list = new OrderItemList(items);

        enq_order = new Order();
        enq_order.setCustomer(cust_data);
        enq_order.setItems(item_list);
        enq_order.setOrderno(new BigDecimal(order_num));
        enq_order.setOrdertype(cust_order_type);

        bookedorders_q = aq_sess.getQueue("OE", "OE_bookedorders_que");
  
        message = bookedorders_q.createMessage();

        /* Put the shipping priority into message property before enqueuing */ 
        m_property = message.getMessageProperty();

        m_property.setPriority(ship_priority);

        obj_payload = message.getObjectPayload();

        obj_payload.setPayloadData(enq_order);

        enq_option = new AQEnqueueOption();

        /* Enqueue the message */
        enq_msg_id = bookedorders_q.enqueue(enq_option, message);

        db_conn.commit();

    }
    catch (AQException aq_ex)
    {
        System.out.println("AQ Exception: " + aq_ex); 
    }
    catch (SQLException sql_ex)
    {
        System.out.println("SQL Exception: " + sql_ex); 
    }

}
 
/* At each region, similar booked order queues are created. The orders are    
   propagated from the central Order Entry's booked order queues to the 
   regional booked order queues.
   For example, at the western region, the booked orders queue is created. 
   Create a priority queue table for WS shipping
 */
public static void createWesternShippingQueueTable(Connection db_conn)
{
    AQSession            aq_sess;
    AQQueueTableProperty mqt_prop;
    AQQueueTable         mq_table;      
    AQQueueProperty      q_prop;
    AQQueue              bookedorders_q;        

    try
    {

        /* Create an AQ Session: */
        aq_sess = AQDriverManager.createAQSession(db_conn);


        /* Create a priority queue table for WS */
        mqt_prop = new AQQueueTableProperty("BOLADM.order_typ");
        mqt_prop.setComment("Western Shipping Priority " + 
                            "MultiConsumer Orders queue table");
        mqt_prop.setCompatible("8.1");
        mqt_prop.setMultiConsumer(true);
        mqt_prop.setSortOrder("priority,enq_time");

        mq_table = aq_sess.createQueueTable("WS", "WS_orders_pr_mqtab", 
                                            mqt_prop);

        /* Booked orders are stored in the priority queue table: */      
        q_prop = new AQQueueProperty();

        bookedorders_q = aq_sess.createQueue(mq_table, "WS_bookedorders_que", 
                                             q_prop);

        /* Start the queue */
        bookedorders_q.start(true, true);
  
    }
    catch (AQException ex)
    {
        System.out.println("AQ Exception: " + ex); 
    }

  /* At each region, the shipping application dequeues orders from the 
     regional booked order queue according to the orders' shipping priorities,
     processes the orders, and enqueues the processed orders into the shipped 
     orders queues or the back orders queues. 
   */
}

Time Specification: Delay

AQ supports delay delivery of messages by letting the enqueuer specify a delay interval on a message when enqueuing the message, that is, the time before that a message cannot be retrieved by a dequeue call. (see "Enqueuing a Message [Specify Message Properties]" in Chapter 11, "Operational Interface: Basic Operations"). The delay interval determines when an enqueued message is marked as available to the dequeuers after the message is enqueued.

Whe