JMS mit Spring und Oracle Advanced Queuing

In jedem Software Projekt, in dem Oracle Datenbank und Spring Framework zusammen verwendet werden, kann mit Hilfe von „Oracle Advanced Queueing“ auf einfache Art und Weise ein fast vollständiger JMS-Service aufgebaut werden. Diese Funktionalität ist in Oracle Datenbank „Out-of-the-Box“ vorhanden und einsatzbereit.

Für die Verwendung von JMS im Oracle werden folgende Bibliotheken benötigt:

WARNUNG: Oracle AQ API kann nur zusammen mit Oracle Universal Connection Pool verwendet werden, leider funktioniert es nicht mit dem BasicDataSource von Apache Commons (wundert mich nicht so sehr), dafür bietet der Oracle UCP ein Paar wirklich interessante und nützliche Funktionen wie Failover und Caching. Außerdem kann das UCP mit beliebigen JDBC Datenbanken verwendet werden (z.B. MySQL).

Es werden zwei Destination Factories benötigt, ich habe sie als einen abstrakten OracleAqFactoryBean und zwei Ableitungen davon – OracleAqQueueFactoryBean und OracleAqTopicFactoryBean implementiert.
Oracle AQ unterscheidet Destination Typen, wenn Sie ein „Oracle Queue“ als „Multiple_consumers => TRUE“ definieren, werden Sie den OracleAqTopicFactoryBean verwenden müssen, und umgekehrt, für ein „Multiple_consumers => FALSE“ ist ein OracleAqQueueFactoryBean nötig.

public class OracleAqFactoryBean {
    private ConnectionFactory connectionFactory;
    private String oracleQueueName = null;
    private String oracleQueueUser = null;    

    @Required
    public void setConnectionFactory(final ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }    

    public String getOracleQueueName() {
        return oracleQueueName;
    }    

    @Required
    public void setOracleQueueName(final String oracleQueueName) {
        this.oracleQueueName = oracleQueueName;
    }    

    public String getOracleQueueUser() {
        return oracleQueueUser;
    }    

    public void setOracleQueueUser(final String oracleQueueUser) {
        this.oracleQueueUser = oracleQueueUser;
    }    

    public boolean isSingleton() {
        return false;
    }    

    protected AQjmsSession getSession() throws JMSException {
        // Oracle AQ sessions seems to be always session transacted!
        final AQjmsSession session = (AQjmsSession) connectionFactory.createConnection().createSession(true, Session.SESSION_TRANSACTED);
        return session;
    }
}

public class OracleAqQueueFactoryBean extends OracleAqFactoryBean implements FactoryBean {    
    public Class getObjectType() {
        return javax.jms.Queue.class;
    }    

    public Queue getObject() throws JMSException {
        final AQjmsSession session = getSession();
        return session.getQueue(getOracleQueueUser(), getOracleQueueName());
    }    
}

public class OracleAqTopicFactoryBean extends OracleAqFactoryBean implements FactoryBean {    
    public Class getObjectType() {
        return javax.jms.Topic.class;
    }    

    public Topic getObject() throws Exception {
        final AQjmsSession session = getSession();
        return session.getTopic(getOracleQueueUser(), getOracleQueueName());
    }    

}

Für den Oracle Benutzer, der die Oracle Advanced Queuing verwenden wird, müssen entsprechende Rechte erteilt werden, ich habe für den Test einen „JMSUSER“ angelegt.

grant execute on sys.dbms_aqadm to jmsuser;
grant execute on sys.dbms_aq to jmsuser;
grant execute on sys.dbms_aqin to jmsuser;
grant execute on sys.dbms_aqjms to jmsuser;

begin 
  DBMS_AQADM.GRANT_TYPE_ACCESS('jmsuser'); --  Only for topics!!!
end;

begin 
  dbms_aqadm.grant_system_privilege('ENQUEUE_ANY','jmsuser');
  dbms_aqadm.grant_system_privilege('DEQUEUE_ANY','jmsuser');
end; 

Jetzt können wir das Queue in der Datenbank anlegen.
Das Payload Type vom Queue legt fest, welche Typen von Nachrichten können durch diesen Queue übertragen werden (z.B. ein Queue mit sys.aq$_jms_text_message Payload Type kann NUR String Nachrichten übertragen).

Folgender Script definiert ein Queue mit JMS MapMessage als Nachrichten Typ. Im Fehlerfall, nach 10 Sekunden wird es versucht die Nachricht noch mal zuzustellen, insgesamt wird es maximal 5 Zustellungs-Versuche geben. Falls die Zustellung erfolgreich war, oder 5-mal fehlschlug, wird die Nachricht aus der Tabelle entfernt (retention).

BEGIN 
    DBMS_AQADM.CREATE_QUEUE_TABLE(
        Queue_table        =>'QT_TEST',
        -- sys.aq$_jms_text_message, sys.aq$_jms_map_message, sys.aq$_jms_bytes_message, sys.aq$_jms_object_message
        Queue_payload_type =>'sys.aq$_jms_map_message',
        Storage_clause     => NULL,
        Sort_list          => 'PRIORITY,ENQ_TIME', -- list of queue table fields
        Multiple_consumers => FALSE, -- Queue or Topic!
        Message_grouping   => DBMS_AQADM.NONE, -- DBMS_AQADM.TRANSACTIONAL
        Comment            => 'Test JMS Queue!',
        Primary_instance   => 0, 
        Secondary_instance => 0, 
        Secure             => FALSE
    );
    
    DBMS_AQADM.CREATE_QUEUE(
        Queue_name          => 'Q_TEST',
        Queue_table         => 'QT_TEST',
        Queue_type          =>  DBMS_AQADM.NORMAL_QUEUE,
        Max_retries         =>  5, -- Count of retries
        Retry_delay         =>  10, -- seconds
        Retention_time      =>  0, -- seconds, DBMS_AQADM.INFINITE - retain forever
        dependency_tracking =>  FALSE,
        Comment             =>  NULL
    );

    DBMS_AQADM.START_QUEUE(
        Queue_name            => 'Q_TEST'
    );   
END;

Zusätzliche Informationen über AQ API finden Sie unter folgendem link

Mit diesem Script kann die Queue wieder gelöscht werden (für testzwecke).

begin
    dbms_aqadm.stop_queue('Q_TEST');
    dbms_aqadm.drop_queue('Q_TEST');
    dbms_aqadm.drop_queue_table('QT_TEST');  
end;

Spring Konfiguration für einen JMS Queue (Beim JMS Topic müssen Sie einfach den anderen ConnectionFactory verwenden und entsprechenden Oracle Queue anlegen)

    <bean id="dataSource" class="oracle.ucp.jdbc.PoolDataSourceFactory" factory-method="getPoolDataSource">
        <property name="connectionFactoryClassName" value="oracle.jdbc.pool.OracleDataSource"/>
        <property name="URL" value="jdbc:oracle:thin:@localhost:1521/orcl"/>
        <property name="user" value="jmsuser"/>
        <property name="password" value="jsmpassword"/>
        <property name="connectionPoolName" value="CONN_POOL"/>
        <property name="minPoolSize" value="5"/>
        <property name="maxPoolSize" value="10"/>
        <property name="initialPoolSize" value="5"/>
        <property name="inactiveConnectionTimeout" value="120"/>
        <property name="validateConnectionOnBorrow" value="true"/>
        <property name="SQLForValidateConnection" value="select 1 from dual" />
        <property name="maxStatements" value="10"/>
    </bean>
            
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="dataSource"/>
    </bean>
            
    <bean id="jmsQueueConnectionFactory" class="oracle.jms.AQjmsFactory" factory-method="getQueueConnectionFactory" >
        <constructor-arg index="0" ref="dataSource"/>
    </bean>    
        
    <!--  Send messages -->
    <bean id="testQueue" class="foo.OracleAqQueueFactoryBean">
        <property name="connectionFactory" ref="jmsQueueConnectionFactory"/>
        <property name="oracleQueueName" value="Q_TEST"/>
    </bean>    
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsQueueConnectionFactory"/>
        <property name="defaultDestination" ref="testQueue"/>
    </bean>

    <bean id="messagingService" class="foo.MessagingService">
        <property name="jmsTemplate" ref="jmsTemplate"/>
    </bean>
            
    <!--  Receive messages -->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="jmsQueueConnectionFactory"/>
        <property name="destination" ref="testQueue"/>
        <property name="messageListener" ref="testListener" />
        <property name="sessionTransacted" value="true"/>
        <property name="transactionManager" ref="transactionManager"/>
    </bean>
    <bean id="testListener" class="foo.TestMessageListener"/>

Der Versand kann folgendermassen implementiert werden:

public class MessagingService {
    private static Logger logger = Logger.getLogger(TestMessageListener.class);

    private JmsTemplate jmsTemplate;
    
    public JmsTemplate getJmsTemplate() {
        return jmsTemplate;
    }
    
    public void setJmsTemplate(final JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }
    
    public void sendMapMessage() {        
        final Map message = new HashMap();
        message.put("1", "First value!");
        message.put("2", "Second value!");
        
        jmsTemplate.convertAndSend(message);        
        
        logger.debug("Sent a message!");
    }
}

Wenn die Nachricht versandt wurde, aber noch nicht konsumiert (kein Listener definiert, es gab ein Fehler bei der Zustellung), oder wenn Retention lang genug bzw. INFINITE gesetzt ist, werden Sie die Nachricht als einen Eintrag in der QT_TEST Tabelle sehen.

So kann der Listener für Empfang von Nachrichten aussehen. (Spring bietet zusätzlich ein MessageListenerAdapter, mit dessen Hilfe Empfang von Nachrichten kann im JavaBean Style implementiert werden)

public class TestMessageListener implements MessageListener {
    private static Logger logger = Logger.getLogger(TestMessageListener.class);
    
    public void onMessage(Message message) {
        if (message instanceof MapMessage) {
            final MapMessage jmsMessage = (MapMessage) message;
            logger.debug("Got a message " + jmsMessage.toString());
        } else {
            throw new IllegalArgumentException("Message must be of type MapMessage");
        }
    }
    
}

Zusätzliche Informationen:

Oracle JMS Introduction

Veröffentlicht in Allgemein, Java, Oracle