rabbitmq批量删除队列(How to delete multiple queues in RabbitMQ)

 rabbitmq  rabbitmq批量删除队列(How to delete multiple queues in RabbitMQ)已关闭评论
11月 182019
 

问题:

rabbitmq里测试时产生了大量的以amq.gen-开头的队列,一个一个删除实在太累了,有什么办法可以一次性的删除??

 

方法如下:

  1. 终端使用rabbitmq-plugins enable rabbitmq_management 命令启用management插件,因为management插件带有一个命令行工具rabbitmqadmin,我们就是使用这个工具。
  2. 如果我们需要删除队列名中含有 “amq.gen-”的队列,使用下面的命令即可:rabbitmqadmin list queues name | awk ‘{print $2}’ | grep “amq.gen-” | xargs -I {} rabbitmqadmin  delete queue name={}

如果rabbitmq操作的是vhost里的queue, 那么在rabbitmqadmin 命令后面 加上参数 –vhost=vhost名称即可

3月 132015
 

rabbitmq在spring中配置,记录下备用

本文侧重介绍如何将rabbitmq整合到spring项目中

maven 依赖包配置如下: 

<dependencies>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.2.0.RELEASE</version>
    </dependency>
</dependencies>

1.首先是生产者配置

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
<?xmlversion=”1.0″encoding=”UTF-8″?>
<beansxmlns=”http://www.springframework.org/schema/beans”
       xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
       xmlns:context=”http://www.springframework.org/schema/context”
       xmlns:rabbit=”http://www.springframework.org/schema/rabbit”
       xsi:schemaLocation=”
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd”>
 
     
   <!– 连接服务配置  –>
   <rabbit:connection-factoryid=”connectionFactory”host=”localhost”username=”guest”
        password=”guest”port=”5672″ />
         
   <rabbit:adminconnection-factory=”connectionFactory”/>
    
   <!– queue 队列声明–>
   <rabbit:queueid=”queue_one”durable=”true”auto-delete=”false”exclusive=”false”name=”queue_one”/>
    
    
   <!– exchange queue binging key 绑定 –>
    <rabbit:direct-exchangename=”my-mq-exchange”durable=”true”auto-delete=”false”id=”my-mq-exchange<span></span>”>
        <rabbit:bindings>
            <rabbit:bindingqueue=”queue_one”key=”queue_one_key”/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
     
    <– spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 –>
    <beanid=”jsonMessageConverter” class=”mq.convert.FastJsonMessageConverter”></bean>
     
    <– spring template声明–>
    <rabbit:templateexchange=”my-mq-exchange”id=”amqpTemplate” connection-factory=”connectionFactory” message-converter=”jsonMessageConverter”/>
</beans>

2.fastjson messageconver插件实现

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
importorg.apache.commons.logging.Log;
importorg.apache.commons.logging.LogFactory;
importorg.springframework.amqp.core.Message;
importorg.springframework.amqp.core.MessageProperties;
importorg.springframework.amqp.support.converter.AbstractMessageConverter;
importorg.springframework.amqp.support.converter.MessageConversionException;
 
importfe.json.FastJson;
 
publicclassFastJsonMessageConverter extendsAbstractMessageConverter {
    privatestaticLog log = LogFactory.getLog(FastJsonMessageConverter.class);
 
    publicstaticfinalString DEFAULT_CHARSET =”UTF-8″;
 
    privatevolatileString defaultCharset = DEFAULT_CHARSET;
     
    publicFastJsonMessageConverter() {
        super();
        //init();
    }
     
    publicvoidsetDefaultCharset(String defaultCharset) {
        this.defaultCharset = (defaultCharset !=null) ? defaultCharset
                : DEFAULT_CHARSET;
    }
     
    publicObject fromMessage(Message message)
            throwsMessageConversionException {
        returnnull;
    }
     
    public<T> T fromMessage(Message message,T t) {
        String json =””;
        try{
            json =newString(message.getBody(),defaultCharset);
        }catch(UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return(T) FastJson.fromJson(json, t.getClass());
    }  
     
 
    protectedMessage createMessage(Object objectToConvert,
            MessageProperties messageProperties)
            throwsMessageConversionException {
        byte[] bytes =null;
        try{
            String jsonString = FastJson.toJson(objectToConvert);
            bytes = jsonString.getBytes(this.defaultCharset);
        }catch(UnsupportedEncodingException e) {
            thrownewMessageConversionException(
                    “Failed to convert Message content”, e);
        }
        messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
        messageProperties.setContentEncoding(this.defaultCharset);
        if(bytes !=null) {
            messageProperties.setContentLength(bytes.length);
        }
        returnnewMessage(bytes, messageProperties);
 
    }
}

3.生产者端调用

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
importjava.util.List;
 
importorg.springframework.amqp.core.AmqpTemplate;
 
 
publicclassMyMqGatway {
     
    @Autowired
    privateAmqpTemplate amqpTemplate;
     
    publicvoidsendDataToCrQueue(Object obj) {
        amqpTemplate.convertAndSend(“queue_one_key”, obj);
    }  
}

4.消费者端配置(与生产者端大同小异)

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
<?xmlversion=”1.0″encoding=”UTF-8″?>
<beansxmlns=”http://www.springframework.org/schema/beans”
       xmlns:xsi=”http://www.w3.org/2001/XMLSchema-instance”
       xmlns:context=”http://www.springframework.org/schema/context”
       xmlns:rabbit=”http://www.springframework.org/schema/rabbit”
       xsi:schemaLocation=”
            http://www.springframework.org/schema/beans
                http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
                http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/rabbit
                http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd”>
 
     
   <!– 连接服务配置  –>
   <rabbit:connection-factoryid=”connectionFactory”host=”localhost”username=”guest”
        password=”guest”port=”5672″ />
         
   <rabbit:adminconnection-factory=”connectionFactory”/>
    
   <!– queue 队列声明–>
   <rabbit:queueid=”queue_one”durable=”true”auto-delete=”false”exclusive=”false”name=”queue_one”/>
    
    
   <!– exchange queue binging key 绑定 –>
    <rabbit:direct-exchangename=”my-mq-exchange”durable=”true”auto-delete=”false”id=”my-mq-exchange”>
        <rabbit:bindings>
            <rabbit:bindingqueue=”queue_one”key=”queue_one_key”/>
        </rabbit:bindings>
    </rabbit:direct-exchange>
 
     
      
    <!– queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象–>
    <rabbit:listener-containerconnection-factory=”connectionFactory”acknowledge=”auto”task-executor=”taskExecutor”>
        <rabbit:listenerqueues=”queue_one”ref=”queueOneLitener”/>
    </rabbit:listener-container>
</beans>

5.消费者端调用

?
1
2
3
4
5
6
7
8
9
importorg.springframework.amqp.core.Message;
importorg.springframework.amqp.core.MessageListener;
 
publicclassQueueOneLitenerimplements MessageListener{
    @Override
    publicvoidonMessage(Message message) {
        System.out.println(” data :”+ message.getBody());
    }
}

6.由于消费端当队列有数据到达时,对应监听的对象就会被通知到,无法做到批量获取,批量入库,因此可以在消费端缓存一个临时队列,将mq取出来的数据存入本地队列,后台线程定时批量处理即可

转自:http://my.oschina.net/never/blog/140368