It seems that I can’t find a solution to my problem, I’m stuck with this for hours.
I am using Oracle AQs:
Dbms_Aqadm.Create_Queue_Table(Queue_Table => 'ITEM_EVENT_QT', Queue_Payload_Type => 'ITEM_EVENT', Multiple_Consumers => TRUE); Dbms_Aqadm.Create_Queue(Queue_Name => 'ITEM_EVENT_QUEUE', Queue_Table => 'ITEM_EVENT_QT', Max_Retries => 5, Retry_Delay => 0, Retention_Time => 432000, -- 5 DAYS Dependency_Tracking => FALSE, COMMENT => 'Item Event Queue'); -- START THE QUEUE Dbms_Aqadm.Start_Queue('ITEM_EVENT_QUEUE'); -- GRANT QUEUE PRIVILEGES Dbms_Aqadm.Grant_Queue_Privilege(Privilege => 'ALL', Queue_Name => 'ITEM_EVENT_QUEUE', Grantee => 'PUBLIC', Grant_Option => FALSE); END;
Here is one of my subscribers:
Dbms_Aqadm.Add_Subscriber(Queue_Name => 'ITEM_EVENT_QUEUE', Subscriber => Sys.Aq$_Agent('ITEM_SUBSCRIBER_1', NULL, NULL), rule => 'tab.user_data.header.thread_no = 1'); Dbms_Aq.Register(Sys.Aq$_Reg_Info_List(Sys.Aq$_Reg_Info('ITEM_EVENT_QUEUE:ITEM_SUBSCRIBER_1', Dbms_Aq.Namespace_Aq, 'plsql://ITEM_API.GET_QUEUE_FROM_QUEUE', HEXTORAW('FF'))),1);
Subscriber Registration:
Whenever a specific event occurs in my database, I use a trigger to add an “event” to my AQ, calling the following procedure from my ITEM_API package:
PROCEDURE ADD_EVENT_TO_QUEUE(I_EVENT IN ITEM_EVENT, O_STATUS_CODE OUT VARCHAR2, O_ERROR_MSG OUT VARCHAR2) IS ENQUEUE_OPTIONS DBMS_AQ.ENQUEUE_OPTIONS_T; MESSAGE_PROPERTIES DBMS_AQ.MESSAGE_PROPERTIES_T; MESSAGE_HANDLE RAW(16); EVENT ITEM_EVENT; HEADER_PROP HEADER_PROPERTIES; BEGIN EVENT := I_EVENT; EVENT.SEQ_NO := ITEM_EVENT_SEQ.NEXTVAL; ENQUEUE_OPTIONS.VISIBILITY := DBMS_AQ.ON_COMMIT; ENQUEUE_OPTIONS.SEQUENCE_DEVIATION := NULL; MESSAGE_PROPERTIES.PRIORITY := 1; MESSAGE_PROPERTIES.DELAY := DBMS_AQ.NO_DELAY; MESSAGE_PROPERTIES.EXPIRATION := DBMS_AQ.NEVER; HEADER_PROP := HEADER_PROPERTIES(1); EVENT.HEADER := HEADER_PROP; DBMS_AQ.ENQUEUE(QUEUE_NAME => 'ITEM_EVENT_QUEUE', ENQUEUE_OPTIONS => ENQUEUE_OPTIONS, MESSAGE_PROPERTIES => MESSAGE_PROPERTIES, PAYLOAD => EVENT, MSGID => MESSAGE_HANDLE); EXCEPTION WHEN OTHERS THEN ERROR_HANDLER.LOG_ERROR(NULL, EVENT.ITEM, EVENT.SEQ_NO, SQLCODE, SQLERRM, O_STATUS_CODE, O_ERROR_MSG); RAISE; END ADD_EVENT_TO_QUEUE;
And it works, because when I check my AQ table, I can find the “event”, however my dequeue method does not go around, as you can see in the image below, there is no DEQ_TIME .
Here is my dequeue method, also from my ITEM_API package:
PROCEDURE GET_QUEUE_FROM_QUEUE(CONTEXT RAW, REGINFO SYS.AQ$_REG_INFO, DESCR SYS.AQ$_DESCRIPTOR, PAYLOAD RAW, PAYLOADL NUMBER) IS R_DEQUEUE_OPTIONS DBMS_AQ.DEQUEUE_OPTIONS_T; R_MESSAGE_PROPERTIES DBMS_AQ.MESSAGE_PROPERTIES_T; V_MESSAGE_HANDLE RAW(16); I_PAYLOAD ITEM_EVENT; L_PROC_EVENT BOOLEAN; O_TARGETS CFG_EVENT_STAGE_TBL; O_ERROR_MSG VARCHAR2(300); O_STATUS_CODE VARCHAR2(100); BEGIN R_DEQUEUE_OPTIONS.MSGID := DESCR.MSG_ID; R_DEQUEUE_OPTIONS.CONSUMER_NAME := DESCR.CONSUMER_NAME; R_DEQUEUE_OPTIONS.DEQUEUE_MODE := DBMS_AQ.REMOVE; --R_DEQUEUE_OPTIONS.WAIT := DBMS_AQ.NO_WAIT; DBMS_AQ.DEQUEUE(QUEUE_NAME => DESCR.QUEUE_NAME, DEQUEUE_OPTIONS => R_DEQUEUE_OPTIONS, MESSAGE_PROPERTIES => R_MESSAGE_PROPERTIES, PAYLOAD => I_PAYLOAD, MSGID => V_MESSAGE_HANDLE); IF I_PAYLOAD IS NOT NULL THEN L_PROC_EVENT := PROCESS_EVENT(I_PAYLOAD, O_TARGETS, O_STATUS_CODE, O_ERROR_MSG); END IF; EXCEPTION WHEN OTHERS THEN ERROR_HANDLER.LOG_ERROR(NULL, NULL, NULL, SQLCODE, SQLERRM, O_STATUS_CODE, O_ERROR_MSG); RAISE; END GET_QUEUE_FROM_QUEUE;
Am I doing something wrong? How can i fix this? I think there might be a problem with my subscription, but I'm not sure.
EDIT: I just realized that if I delete the subscribers and the register, and then add them again, they deactivate all the messages. Howerver, if another event enters the queue, it remains undefined (or until I delete and add subscribers):
A record with status 0 and no DEQ_TIME is new.
Do I need a planner or something like that?
EDIT: I added the distribution of my scheduler to AQ:
DBMS_AQADM.SCHEDULE_PROPAGATION('ITEM_EVENT_QUEUE');
and even added a next_time field:
DBMS_AQADM.SCHEDULE_PROPAGATION('ITEM_EVENT_QUEUE', SYSDATE + 30/86400);
Still not working. Any suggestions? I assume that AQ notifications do not work and my callback procedure is never called. How can i fix this?
EDIT: I removed my procedure from the package for testing purposes only, so my teammates can compile the ITEM_API package (I don’t know whether to recompile the package, whether or not they affect the detection process). Still not working.