欢迎您的访问
专注于Java技术系列文章的Java技术分享网站

消息中间件 笔记 四:RabbitMQ与Spring集成

一、RabbitMQ与Spring集成

准备工作:

分别新建名为RabbitMQSpringProducer和RabbitMQSpringConsumer的maven web工程

在pom.xml文件里面引入如下依赖:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.study.demo</groupId>
    <artifactId>RabbitMQSpringProducer</artifactId>
    <packaging>war</packaging>
    <version>0.0.1-SNAPSHOT</version>
    <name>RabbitMQSpringProducer Maven Webapp</name>
    <url>http://maven.apache.org</url>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-web-api</artifactId>
            <version>7.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>1.7.5</version>
        </dependency>


        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>4.3.11.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>4.3.11.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>javax.servlet</groupId>
            <artifactId>jstl</artifactId>
            <version>1.2</version>
        </dependency>

        <!--日志 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.16</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>1.7.5</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.0.13</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.0.13</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-access</artifactId>
            <version>1.0.13</version>
        </dependency>

        <!--JSON -->
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-mapper-asl</artifactId>
            <version>1.9.13</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jackson</groupId>
            <artifactId>jackson-core-asl</artifactId>
            <version>1.9.13</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.8.4</version>
        </dependency>

        <!-- RabbitMQ -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.0.0.RELEASE</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>RabbitMQSpringProducer</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.3.2</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
        <resources>
            <resource>
                <directory>${basedir}/src/main/java</directory>
                <includes>
                    <include>**/*.xml</include>
                </includes>
            </resource>
        </resources>
    </build>
</project>

与Spring集成步骤:

配置文件中增加命名空间:
xmlns:rabbit=”http://www.springframework.org/schema/rabbit”
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd

配置文件中的配置
1) 连接工厂配置
2)
3) 声明队列
4) 声明交换器
5) 队列和交换器进行绑定
6) 生产者端要声明RabbitmqTemplate

1. 在工程RabbitMQSpringProducer里面新建/RabbitMQSpringProducer/src/main/java/applicationContext.xml的配置文件

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:mvc="http://www.springframework.org/schema/mvc"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:jee="http://www.springframework.org/schema/jee"
       xmlns:p="http://www.springframework.org/schema/p"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:task="http://www.springframework.org/schema/task"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context-4.0.xsd
        http://www.springframework.org/schema/jee
        http://www.springframework.org/schema/jee/spring-jee-4.0.xsd
        http://www.springframework.org/schema/mvc
        http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
        http://www.springframework.org/schema/aop
        http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
        http://www.springframework.org/schema/task
        http://www.springframework.org/schema/task/spring-task-4.0.xsd
        http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd">


    <!-- rabbitMQ配置 -->
    <bean id="rabbitConnectionFactory"
        class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="127.0.0.1"/>
        <property name="username" value="guest"/>
        <property name="password" value="guest"/>
        <property name="channelCacheSize" value="8"/>
        <property name="port" value="5672"></property>
    </bean>
    <!--Spring的rabbitmq admin-->
    <rabbit:admin connection-factory="rabbitConnectionFactory"/>

    <!--生产者创建队列-->
    <rabbit:queue name="p_create_queue" durable="false"/>

    <!--fanout交换器-->
    <rabbit:fanout-exchange name="fanout-exchange"
        xmlns="http://www.springframework.org/schema/rabbit" durable="false">
        <rabbit:bindings>
            <rabbit:binding queue="p_create_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:fanout-exchange>

    <!--topic交换器-->
    <rabbit:topic-exchange name="topic-exchange"
         xmlns="http://www.springframework.org/schema/rabbit" durable="false">
    </rabbit:topic-exchange>
    <!-- rabbitTemplate 消息模板类 -->

    <!--定义Spring的RabbitMQ的连接模板  -->
    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
        <constructor-arg ref="rabbitConnectionFactory"></constructor-arg>
    </bean>

</beans>  

3. 在工程RabbitMQSpringProducer里面新建/RabbitMQSpringProducer/src/main/java/spring-mvc.xml配置文件

<?xml version="1.0" encoding="UTF-8"?>  
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"   
       xmlns:aop="http://www.springframework.org/schema/aop"   
       xmlns:context="http://www.springframework.org/schema/context"  
       xmlns:mvc="http://www.springframework.org/schema/mvc"   
       xmlns:tx="http://www.springframework.org/schema/tx"   
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
       xsi:schemaLocation="http://www.springframework.org/schema/aop   
        http://www.springframework.org/schema/aop/spring-aop-4.0.xsd   
        http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd   
        http://www.springframework.org/schema/mvc   
        http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd   
        http://www.springframework.org/schema/tx   
        http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">

<!--    <mvc:default-servlet-handler />-->
    <mvc:resources mapping="/js/**" location="/js/"/>
    <mvc:annotation-driven
            content-negotiation-manager="contentNegotiationManager" />

    <context:component-scan base-package="com.study.demo">
        <context:include-filter type="annotation"
                                expression="org.springframework.stereotype.Controller" />
    </context:component-scan>


    <bean id="stringHttpMessageConverter"
          class="org.springframework.http.converter.StringHttpMessageConverter">
        <property name="supportedMediaTypes">
            <list>
                <bean class="org.springframework.http.MediaType">
                    <constructor-arg index="0" value="text" />
                    <constructor-arg index="1" value="plain" />
                    <constructor-arg index="2" value="UTF-8" />
                </bean>
            </list>
        </property>
    </bean>
    <bean id="mappingJacksonHttpMessageConverter"
          class="org.springframework.http.converter.json.MappingJackson2HttpMessageConverter" />

    <bean
            class="org.springframework.web.servlet.mvc.annotation.AnnotationMethodHandlerAdapter">
        <property name="messageConverters">
            <list>
                <ref bean="stringHttpMessageConverter" />
                <ref bean="mappingJacksonHttpMessageConverter" />
            </list>
        </property>
    </bean>

    <bean id="contentNegotiationManager"
          class="org.springframework.web.accept.ContentNegotiationManagerFactoryBean">
        <property name="mediaTypes">
            <map>
                <entry key="html" value="text/html" />
                <entry key="pdf" value="application/pdf" />
                <entry key="xsl" value="application/vnd.ms-excel" />
                <entry key="xml" value="application/xml" />
                <entry key="json" value="application/json" />
            </map>
        </property>
        <property name="defaultContentType" value="text/html" />
    </bean>

    <bean id="viewResolver"
          class="org.springframework.web.servlet.view.ContentNegotiatingViewResolver">
        <property name="order" value="0" />
        <property name="contentNegotiationManager" ref="contentNegotiationManager" />

        <property name="viewResolvers">
            <list>
                <bean class="org.springframework.web.servlet.view.BeanNameViewResolver" />
                <bean
                        class="org.springframework.web.servlet.view.InternalResourceViewResolver">
                    <property name="viewClass"
                              value="org.springframework.web.servlet.view.JstlView" />
                    <property name="prefix" value="/WEB-INF/pages/" />
                    <property name="suffix" value=".jsp"></property>
                </bean>
            </list>
        </property>

        <property name="defaultViews">
            <list>
                <bean
                        class="org.springframework.web.servlet.view.json.MappingJackson2JsonView">
                    <property name="extractValueFromSingleKeyModel" value="true" />
                </bean>
            </list>
        </property>
    </bean>

</beans>  

3. 在工程RabbitMQSpringProducer里面新建一个RabbitMQ与Spring集成发送消息控制器

package com.study.demo.controller;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * 
 * @Description: RabbitMQ与Spring集成发送消息控制器
 * @author leeSmall
 * @date 2018年9月17日
 *
 */
@Controller
@RequestMapping("/rabbitmq")
public class RabbitMqController {

    private Logger logger = LoggerFactory.getLogger(RabbitMqController.class);

    @Autowired
    private RabbitTemplate rabbitTemplate;


    @ResponseBody
    @RequestMapping("/fanoutSender")
    public String fanoutSender(@RequestParam("message")String message){
        String opt="";
        try {
            String str = "Fanout,the message_"+" is : "+message;
            logger.info("**************************Send Message:["+str+"]");
            rabbitTemplate.send("fanout-exchange","",
                    new Message(str.getBytes(),new MessageProperties()));

            opt = "suc";
        } catch (Exception e) {
            opt = e.getCause().toString();
        }
        return opt;
    }

    @ResponseBody
    @RequestMapping("/topicSender")
    public String topicSender(@RequestParam("message")String message){
        String opt="";
        try {
            String[] severities={"error","info","warning"};
            String[] modules={"email","order","user"};
            for(int i=0;i<severities.length;i++){
                for(int j=0;j<modules.length;j++){
                    String routeKey = severities[i]+"."+modules[j];
                    String str = "the message is [rk:"+routeKey+"]["+message+"]";
                    rabbitTemplate.send("topic-exchange",routeKey,
                            new Message(str.getBytes(),new MessageProperties()));

                }
            }
            opt = "suc";
        } catch (Exception e) {
            opt = e.getCause().toString();
        }
        return opt;
    }

}

4. 在工程RabbitMQSpringProducer里面新建一个发送消息的/RabbitMQSpringProducer/src/main/webapp/index.jsp页面

<%@ page language="java" import="java.util.*" pageEncoding="UTF-8"%>
<%
    String path = request.getContextPath();
    System.out.println(path);
    String basePath = request.getScheme() + "://"
            + request.getServerName() + ":" + request.getServerPort()
            + path + "/";
    System.out.println(basePath);
%>

<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN">
<html>
<head>
<base href="<%=basePath%>">

<title>RabbitMQ Demo程序</title>

<meta http-equiv="pragma" content="no-cache">
<meta http-equiv="cache-control" content="no-cache">
<meta http-equiv="expires" content="0">
<script type="text/javascript" src="<%--<%=basePath%>--%>js/jquery-1.11.0.min.js"></script>
<style type="text/css">
.h1 {
    margin: 0 auto;
}

#producer{
    width: 48%;
     border: 1px solid blue; 
    height: 80%;
    align:center;
    margin:0 auto;
}

body{
    text-align :center;
} 
div {
    text-align :center;
}
textarea{
    width:80%;
    height:100px;
    border:1px solid gray;
}
button{
    background-color: rgb(62, 156, 66);
    border: none;
    font-weight: bold;
    color: white;
    height:30px;
}
</style>
<script type="text/javascript">

    function send(controller){
        if($("#message").val()==""){
            $("#message").css("border","1px solid red");
            return;
        }else{
            $("#message").css("border","1px solid gray");
        }
        $.ajax({
            type: 'post',
            url:'<%=basePath%>rabbitmq/'+controller,
            dataType:'text', 
            data:{"message":$("#message").val()},
            success:function(data){
                if(data=="suc"){
                    $("#status").html("<font color=green>发送成功</font>");
                    setTimeout(clear,1000);
                }else{
                    $("#status").html("<font color=red>"+data+"</font>");
                    setTimeout(clear,5000);
                }
            },
            error:function(data){
                $("#status").html("<font color=red>ERROR:"+data["status"]+","+data["statusText"]+"</font>");
                setTimeout(clear,5000);
            }

        });
    }

    function clear(){
        $("#status").html("");
    }

</script>
</head>

<body>
    <h1>Hello RabbitMQ</h1>
    <div id="producer">
        <h2>Producer</h2>
        <textarea id="message"></textarea>
        <br>
        <button onclick="send('fanoutSender')">发送Fanout消息</button>
        <button onclick="send('topicSender')">发送Topic消息</button>
        <br>
        <span id="status"></span>
    </div>
</body>
</html>

5. 在工程RabbitMQSpringProducer里面新建/RabbitMQSpringProducer/src/main/webapp/WEB-INF/web.xml

<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
         version="3.0">
  <display-name>RabbitMqSpringProducerDemo</display-name>

  <servlet-mapping>
    <servlet-name>default</servlet-name>
    <url-pattern>*.js</url-pattern>
  </servlet-mapping>

  <!-- Spring 编码过滤器 start -->
  <filter>
    <filter-name>characterEncoding</filter-name>
    <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
    <init-param>
      <param-name>encoding</param-name>
      <param-value>UTF-8</param-value>
    </init-param>
    <init-param>
      <param-name>forceEncoding</param-name>
      <param-value>true</param-value>
    </init-param>
  </filter>
  <filter-mapping>
    <filter-name>characterEncoding</filter-name>
    <url-pattern>/*</url-pattern>
  </filter-mapping>
  <!-- Spring 编码过滤器 End -->

  <!-- Spring Application Context Listener Start -->
  <context-param>
    <param-name>contextConfigLocation</param-name>
    <param-value>classpath:applicationContext.xml</param-value>
  </context-param>
  <listener>
    <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  </listener>
  <!-- Spring Application Context Listener End -->


  <!-- Spring MVC Config Start -->
  <servlet>
    <servlet-name>SpringMVC</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
      <param-name>contextConfigLocation</param-name>
      <param-value>classpath:spring-mvc.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
  </servlet>
  <servlet-mapping>
    <servlet-name>SpringMVC</servlet-name>
    <!-- Filter all resources -->
    <url-pattern>/</url-pattern>
  </servlet-mapping>
  <!-- Spring MVC Config End -->

</web-app>

到此生产者服务代码编写完成!

6、 在Tomcat v8.5 8080里面启动RabbitMQSpringProducer,在浏览器输入地址http://localhost:8080/RabbitMQSpringProducer/访问

98_1.png

98_2.png

6. 在工程RabbitMQSpringConsumer里面新建三个fanout消费者

fanout消费者1:

package com.study.demo.service.fanout;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

/**
 * 
 * @Description: RabbitMQ与Spring集成fanout消费者
 * @author leeSmall
 * @date 2018年9月17日
 *
 */
@Component
public class FanoutService_H1 implements MessageListener{
    private Logger logger = LoggerFactory.getLogger(FanoutService_H1.class);
    public void onMessage(Message message) {
        logger.info("Get message:"+new String(message.getBody()));
    }
}

fanout消费者2:

package com.study.demo.service.fanout;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

/**
 * 
 * @Description: RabbitMQ与Spring集成fanout消费者
 * @author leeSmall
 * @date 2018年9月17日
 *
 */
@Component
public class FanoutService_H2 implements MessageListener{
    private Logger logger = LoggerFactory.getLogger(FanoutService_H2.class);
    public void onMessage(Message message) {
        logger.info("Get message:"+new String(message.getBody()));
    }
}

fanout消费者3:

package com.study.demo.service.fanout;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

/**
 * 
 * @Description: RabbitMQ与Spring集成fanout消费者
 * @author leeSmall
 * @date 2018年9月17日
 *
 */
@Component
public class FanoutService_H3 implements MessageListener{
    private Logger logger = LoggerFactory.getLogger(FanoutService_H3.class);
    public void onMessage(Message message) {
        logger.info("Get message:"+new String(message.getBody()));
    }
}

7. 在工程RabbitMQSpringConsumer里面新建4个topic消费者

topic消费者1:

package com.study.demo.service.topic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

/**
 * 
 * @Description: RabbitMQ与Spring集成topic消费者
 * @author leeSmall
 * @date 2018年9月17日
 *
 */
@Component
public class AllErrorTopicService implements MessageListener{
    private Logger logger = LoggerFactory.getLogger(AllErrorTopicService.class);
    public void onMessage(Message message) {
        logger.info("Get message:"+new String(message.getBody()));
    }
}

topic消费者2:

package com.study.demo.service.topic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

/**
 * 
 * @Description: RabbitMQ与Spring集成topic消费者
 * @author leeSmall
 * @date 2018年9月17日
 *
 */
@Component
public class AllLogTopicService implements MessageListener{
    private Logger logger = LoggerFactory.getLogger(AllLogTopicService.class);
    public void onMessage(Message message) {
        logger.info("Get message:"+new String(message.getBody()));
    }
}

topic消费者3:

package com.study.demo.service.topic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

/**
 * 
 * @Description: RabbitMQ与Spring集成topic消费者
 * @author leeSmall
 * @date 2018年9月17日
 *
 */
@Component
public class EmailAllTopicService implements MessageListener{
    private Logger logger = LoggerFactory.getLogger(EmailAllTopicService.class);
    public void onMessage(Message message) {
        logger.info("Get message:"+new String(message.getBody()));
    }
}

topic消费者4:

package com.study.demo.service.topic;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

/**
 * 
 * @Description: RabbitMQ与Spring集成topic消费者
 * @author leeSmall
 * @date 2018年9月17日
 *
 */
@Component
public class EmailErrorTopicService implements MessageListener{
    private Logger logger = LoggerFactory.getLogger(EmailErrorTopicService.class);
    public void onMessage(Message message) {
        logger.info("Get message:"+new String(message.getBody()));
    }
}

8. 在工程RabbitMQSpringConsumer里面新建/RabbitMQSpringConsumer/src/main/java/applicationContext.xml配置文件,在里面配置RabbitMQ相关配置和消费者监听队列

<?xml version="1.0" encoding="UTF-8"?>
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
           http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context-4.0.xsd
           http://www.springframework.org/schema/rabbit
           http://www.springframework.org/schema/rabbit/spring-rabbit-2.0.xsd">

     <!-- 配置扫描路径 -->
    <context:component-scan base-package="com.study.demo">
         <context:exclude-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
    </context:component-scan>

    <!-- rabbitMQ配置 -->
    <bean id="rabbitConnectionFactory"
          class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="127.0.0.1"/>
        <property name="username" value="guest"/>
        <property name="password" value="guest"/>
        <property name="channelCacheSize" value="8"/>
        <property name="port" value="5672"></property>
    </bean>
    <rabbit:admin connection-factory="rabbitConnectionFactory"/>

    <!-- fanout交换器 begin-->
    <!-- 定义队列 -->
    <rabbit:queue name="h1_queue" durable="false"/>
    <rabbit:queue name="h2_queue" durable="false"/>
    <rabbit:queue name="h3_queue" durable="false"/>

    <!-- 把需要数据的队列与交换器绑定一起 -->
    <rabbit:fanout-exchange name="fanout-exchange"
                            xmlns="http://www.springframework.org/schema/rabbit"
                            durable="false">
        <rabbit:bindings>
            <rabbit:binding queue="h1_queue"></rabbit:binding>
            <rabbit:binding queue="h2_queue"></rabbit:binding>
            <rabbit:binding queue="h3_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:fanout-exchange>
    <!-- fanout交换器 end-->


    <!-- topic交换器 begin-->
    <!-- 定义队列 -->
    <rabbit:queue name="all_log_queue" durable="false"/>
    <rabbit:queue name="email_all_queue" durable="false"/>
    <rabbit:queue name="email_error_queue" durable="false"/>
    <rabbit:queue name="all_error_queue" durable="false"/>

    <!-- 把需要数据的队列通过路由键与交换器绑定一起 -->
    <rabbit:topic-exchange name="topic-exchange"
                           xmlns="http://www.springframework.org/schema/rabbit"
                           durable="false">
        <rabbit:bindings>
            <rabbit:binding queue="all_log_queue" pattern="#"></rabbit:binding>
            <rabbit:binding queue="email_all_queue" pattern="*.email"></rabbit:binding>
            <rabbit:binding queue="email_error_queue"  pattern="error.email"></rabbit:binding>
            <rabbit:binding queue="all_error_queue"  pattern="error.*"></rabbit:binding>

        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!-- topic交换器 end-->


    <!--监听容器-->
    <rabbit:listener-container connection-factory="rabbitConnectionFactory">
        <rabbit:listener ref="fanoutService_H1" queues="h1_queue" method="onMessage" />
        <rabbit:listener ref="fanoutService_H2" queues="h2_queue" method="onMessage" />
        <rabbit:listener ref="fanoutService_H3" queues="h3_queue" method="onMessage" />
        <rabbit:listener ref="allLogTopicService" queues="all_log_queue" method="onMessage" />
        <rabbit:listener ref="emailAllTopicService" queues="email_all_queue" method="onMessage" />
        <rabbit:listener ref="emailErrorTopicService" queues="email_error_queue" method="onMessage" />
        <rabbit:listener ref="allErrorTopicService" queues="all_error_queue" method="onMessage" />
    </rabbit:listener-container>


</beans>  

9. 在工程RabbitMQSpringConsumer里面新建/RabbitMQSpringConsumer/src/main/java/spring-mvc.xml配置文件

<?xml version="1.0" encoding="UTF-8"?>  
<!-- 查找最新的schemaLocation 访问 http://www.springframework.org/schema/ -->
<beans xmlns="http://www.springframework.org/schema/beans"   
       xmlns:aop="http://www.springframework.org/schema/aop"   
       xmlns:context="http://www.springframework.org/schema/context"  
       xmlns:mvc="http://www.springframework.org/schema/mvc"   
       xmlns:tx="http://www.springframework.org/schema/tx"   
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
       xsi:schemaLocation="http://www.springframework.org/schema/aop   
        http://www.springframework.org/schema/aop/spring-aop-4.0.xsd   
        http://www.springframework.org/schema/beans   
        http://www.springframework.org/schema/beans/spring-beans-4.0.xsd   
        http://www.springframework.org/schema/context   
        http://www.springframework.org/schema/context/spring-context-4.0.xsd   
        http://www.springframework.org/schema/mvc   
        http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd   
        http://www.springframework.org/schema/tx   
        http://www.springframework.org/schema/tx/spring-tx-4.0.xsd">  

      <!-- 启用MVC注解 -->
    <mvc:annotation-driven />

    <!-- 静态资源文件,不会被Spring MVC拦截 -->
    <mvc:resources location="/resources/" mapping="/resources/**"/>

    <!-- 指定Sping组件扫描的基本包路径 -->
    <context:component-scan base-package="com.study.demo" >
        <!-- 这里只扫描Controller,不可重复加载Service -->
        <context:include-filter type="annotation" expression="org.springframework.stereotype.Controller"/>
    </context:component-scan>

      <!-- JSP视图解析器-->
    <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">  
        <property name="prefix" value="/WEB-INF/views/" />  
        <property name="suffix" value=".jsp" />
    <!--  定义其解析视图的order顺序为1 -->
        <property name="order" value="1" />
    </bean>

</beans>  

10. 在工程RabbitMQSpringConsumer里面新建/RabbitMQSpringConsumer/src/main/webapp/WEB-INF/web.xml配置文件

<web-app xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xmlns="http://java.sun.com/xml/ns/javaee" xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
         xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
         version="3.0">
  <display-name>RabbitMqSpringConsumerDemo</display-name>

  <context-param>
    <param-name>logbackConfigLocation</param-name>
    <param-value>/WEB-INF/conf/logback.xml</param-value>
  </context-param>

  <!-- Spring 编码过滤器 start -->
  <filter>
    <filter-name>characterEncoding</filter-name>
    <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
    <init-param>
      <param-name>encoding</param-name>
      <param-value>UTF-8</param-value>
    </init-param>
    <init-param>
      <param-name>forceEncoding</param-name>
      <param-value>true</param-value>
    </init-param>
  </filter>
  <filter-mapping>
    <filter-name>characterEncoding</filter-name>
    <url-pattern>/*</url-pattern>
  </filter-mapping>
  <!-- Spring 编码过滤器 End -->



  <!-- Spring Application Context Listener Start -->
  <context-param>
    <param-name>contextConfigLocation</param-name>
    <param-value>classpath:applicationContext.xml</param-value>
  </context-param>
  <listener>
    <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
  </listener>
  <!-- Spring Application Context Listener End -->


  <!-- Spring MVC Config Start -->
  <servlet>
    <servlet-name>SpringMVC</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>

    <init-param>
      <param-name>contextConfigLocation</param-name>
      <param-value>classpath:spring-mvc.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
  </servlet>
  <servlet-mapping>
    <servlet-name>SpringMVC</servlet-name>
    <!-- Filter all resources -->
    <url-pattern>/</url-pattern>
  </servlet-mapping>
  <!-- Spring MVC Config End -->

</web-app>

11. 生产者消费者共同日志配置文件/RabbitMQSpringConsumer/src/main/webapp/WEB-INF/conf/logback.xml

<?xml version="1.0" encoding="UTF-8"?>
<configuration>

    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <!--<Pattern>%d{HH:mm:ss.SSS} [%T] %level %logger{36} - %msg%n</Pattern>-->
            <Pattern>%d{yyyy/MM/dd-HH:mm:ss} %level [%thread] %caller{1} - %msg%n</Pattern>
        </encoder>
    </appender>

    <logger name="com.study.demo" level="debug" addtivity="false"/>
    <logger name="org.springframework" level="error"  addtivity="false" />

    <root level="debug">
        <appender-ref ref="STDOUT"/>
    </root>

</configuration>

到此消费端代码编写完成!

12. 在Tomcat v8.5 8081里面启动RabbitMQSpringConsumer消费者

98_3.png

在生产者RabbitMQSpringProducer页面发送fanout消息

98_4.png

查看消费者RabbitMQSpringConsumer的情况

98_5.png

在生产者RabbitMQSpringProducer页面发送topic消息

98_6.png

查看消费者RabbitMQSpringConsumer的情况

98_7.png

示例代码获取地址

来源:https://www.cnblogs.com/leeSmall/category/1299598.html

文章永久链接:https://tech.souyunku.com/?p=14458

赞(82) 打赏



版权归原创作者所有,任何形式转载请联系作者;搜云库 » 消息中间件 笔记 四:RabbitMQ与Spring集成

本站:免责声明!

评论 抢沙发

一个专注于Java技术系列文章的技术分享网站

觉得文章有用就打赏一下文章作者

微信扫一扫打赏

微信扫一扫打赏