とあるエンジニア系ライダーの日常

Webエンジニアの話、バイクの話

ActiveMQのTransactionに関して検証してみた

ActiveMQのTransactionに関して検証してみた。

通知とトランザクションに関して

JMSでは「通知」と「トランザクション」の二つの方法がある。

通知

プロデューサー(送信者) → JMSサーバー へ送られた時、
JMSサーバー → プロデューサー(送信者)へと受け取った通知を渡して完了する。

JMSサーバー → コンシューマー(受信者)へ送られた時、
コンシューマー(受信者)→ JMSサーバー へと受け取った通知を渡して完了する。

ActiveMQにはこれらに関して3つのモードがある。

  • autoモード

メッセージの配信の通知をJMSサーバーが自動的に処理してくれます。一回だけの配信を保証します。
createSession(false,Session.AUTO_ACKNOWLEDGE)で設定します。

  • clientモード

確認応答をJMSサーバーではなくJMSクライアントが行うモードです。
createSession(false,Session.CLIENT_ACKNOWLEDGE)で設定します。

  • duplicates okeyモード

自動的に確認応答を行ってくれます。数通受け取った場合通知を行うため、autoモードと違い同じメッセージが何通も配信されることがあります。必ず一回だけを保証するautoモードと比べるとその点を保証しない分処理量が少ないというメリットがあります。
createSession(false,Session.DUPS_OK_ACKNOWLEDGE)で設定します。


トランザクション

処理を確定するコミット、処理を復元するロールバックなどを行うことが出来る。
トランザクションモードではコンシューマ/プロデューサーがコミットを行えば、
JMSサーバに対してメッセージ配信が完了したことを通知する。

コンシューマがロールバックを行えば受信されたことをキャンセルし、
再配信を要求することになる。

プロデューサーがロールバックを行えばメッセージ送信をキャンセルする。
ActiveMQでは、

createSession(true,-1)で設定します。

※第二引数は使用しないので、trueにした時点で通知は関係なくなる。

ということで、

トランザクションを検証してみた

コードを全部書くと長いのでわかりそうなレベルで省略しています。
送受信の購読するキューは1つで。
設定は createSession(true,-1)

検証1:トランザクションとコミットが効く

sender.send("test1"); 

コミット前は読めない

message = (TextMessage) receiver.receive(1000);
assertNull(message);

コミット後は読める

sender.commit();
message = (TextMessage) qmForRecieve.receive(1000);
assertThat(message.getText(), is("test1"));

受信側がコミットしないと消えない

message = (TextMessage) receiver.receive(1000);
assertThat(message.getText(), is("test1"));  未だ読める

receiver.commit();
message = (TextMessage) receiver.receive(1000);
assertNull(message); 消える

検証2:トランザクションロールバックが効く

sender.send("test1"); 

ロールバック前は読めない

message = (TextMessage) receiver.receive(1000);
assertNull(message);

ロールバックすると読めない

qm.rollback();

message = (TextMessage) receiver.receive(1000);
assertNull(message);

検証3:他の受信者が使ってるメッセージは読めない

sender.send("test1");

sender.commit();

ある受信者が読んでいる間は
message = (TextMessage) receiver01.receive(1000);
assertThat(message.getText(), is("test1"));

別の受信者は読めない
message = (TextMessage)  receiver02.receive(1000);
assertNull(message);

でもある受信者がロールバックして終了すると

receiver01.rollback();

別の受信者が読める
message = (TextMessage)  receiver02.receive(1000);message = null;
assertThat(message.getText(), is("test1"));
receiver02.commit();

検証4:別のメッセージでも読めない

sender.send("test1");

sender.send("test2");

sender.commit();

ある受信者が読んでいる間は
message = (TextMessage) receiver01.receive(1000);
assertThat(message.getText(), is("test1"));

別の受信者は別のメッセージでも読めない
message = (TextMessage)  receiver02.receive(1000);
assertNull(message);

でもある受信者が終了すると

receiver01.commit();

別の受信者が読める
message = (TextMessage)  receiver02.receive(1000);
assertThat(message.getText(), is("test2"));
receiver02.commit();

検証5:別のメッセージは書ける

sender01.send("test1");

sender02.send("test2");

sender02のみコミットする

sender02.commit()

Sender01のコミットを待たずに読める
message = (TextMessage) receiver.receive(1000);
assertThat(message.getText(), is("test2")); 

 receiver.commit();

sender01をコミットする

sender01.commit(); 

 

Sender01のコミットも読める(02のコミットは先ほど消費した)
message = (TextMessage) receiver.receive(1000);

assertThat(message.getText(), is("test1"));  

receiver.commit();

結論

Transactionがちゃんと効く!!!

送信側にも受信側にもトランザクションがあるっておもしろいなー。

でも、送信は別のトランザクション待たずに送信するけど、

受信は別のトランザクション待たないと受信できないんだなーと。

UT間違えたかな((((;゚Д゚))))ガクガクブルブル