如何在SSM框架中实现ActiveMQ消息的发布及消息订阅
1.首先在applicationI.xml增加配置消息接收与发送如下:
<!-- 消息队列,表示是点到点(P2P)消息通讯模型 --> <bean id="msgQueue" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0" value="msgQueue" /> </bean> <!-- 消息发送端 --> <bean id="jmsSender" class="com.ims.interfaces.jms.activeMQ.JmsSenderImpl"> <property name="jmsTemplate" ref="jmsTemplate" /> <property name="msgQueue" ref="msgQueue"/> </bean> <!-- 消息异步接收端--> <bean id="msgListenerContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer"> <property name="connectionFactory" ref="jmsConnectionFactory" /> <property name="destination" ref="msgQueue" /> <property name="messageListener" ref="msgListener"/> </bean>
2.定义消息发送接口JmsSender.java
package com.ims.interfaces.jms.activeMQ; public interface JmsSender { public void send(final MsgContext context); }
3.消息发送的实现类
package com.ims.interfaces.jms.activeMQ;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
public class JmsSenderImpl implements JmsSender{
private JmsTemplate jmsTemplate;
/**
* 消息通告队列,表示点到点通讯模型
*/
private Destination msgQueue;
/**
* 发送消息
* @param context 消息上下文
*/
public void send(final MsgContext context) {
getJmsTemplate().send(getMsgQueue(), new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
final ObjectMessage message = session.createObjectMessage();
message.setObject(context);
return message;
}
});
}
public JmsTemplate getJmsTemplate() {
return jmsTemplate;
}
public void setJmsTemplate(JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public Destination getMsgQueue() {
return msgQueue;
}
public void setMsgQueue(Destination msgQueue) {
this.msgQueue = msgQueue;
}
}
4.消息上下文
package com.ims.interfaces.jms.activeMQ;
import java.io.Serializable;
import java.util.Collection;
public class MsgContext implements Serializable{
private static final long serialVersionUID = -7877770277875813295L;
/**
* 消息接收者
*/
private final Collection<String> users;
/**
* 发送的消息
*/
private final String message;
/**
* 发送的方式
*/
private final Collection<String> modes;
public MsgContext(Collection<String> users, String message,
Collection<String> modes){
this.users = users;
this.message = message;
this.modes = modes;
}
public Collection<String> getUsers() {
return users;
}
public String getMessage() {
return message;
}
public Collection<String> getModes() {
return modes;
}
}
5.MsgListener.java,消息监听、执行器
package com.ims.interfaces.jms.activeMQ;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import org.springframework.jms.support.JmsUtils;
public class MsgListener implements MessageListener{
/**
* 监听到有消息时,执行的操作
* @param msg 消息对象
*/
@Override
public void onMessage(Message message) {
if(message == null) { return; }
// 获取对象消息
final ObjectMessage objectMessage = (ObjectMessage) message;
// 消息上下文
MsgContext context = null;
try {
// 从对象消息中获取消息上下文对象
context = (MsgContext) objectMessage.getObject();
}catch (JMSException e) {
JmsUtils.convertJmsAccessException(e);
return;
}
System.out.println(context.getMessage());
if(context==null || context.getModes()==null || context.getModes().size()==0){
return;
}
// 取到消息上下文后,可执行自定义操作
}
}
6.消息发送controller方法
@RequestMapping("jmssend") public void send(@RequestParam String msg){ JmsSender jmsSender = ServiceLocator.getService("jmsSender"); Collection<String> users = new HashSet<String>(); Collection<String> modes = new HashSet<String>(); MsgContext msgContext = new MsgContext(users, msg, modes); jmsSender.send(msgContext); }