博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java 操作 RabbitMQ 发送、接受消息
阅读量:6529 次
发布时间:2019-06-24

本文共 7279 字,大约阅读时间需要 24 分钟。

例子1

Producer.java

import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class Producer {    public final static String QUEUE_NAME="rabbitMQ_test2";    public static void main(String[] args) throws IOException, TimeoutException {        //创建连接工厂        ConnectionFactory factory = new ConnectionFactory();        //设置RabbitMQ相关信息        factory.setHost("100.51.15.10");        factory.setUsername("admin");        factory.setPassword("admin");        factory.setPort(5672);        //创建一个新的连接        Connection connection = factory.newConnection();        //创建一个通道        Channel channel = connection.createChannel();        // 声明一个队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        //发送消息到队列中        String message = "Hello RabbitMQ";        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));        System.out.println("Producer Send +'" + message + "'");        //关闭通道和连接        channel.close();        connection.close();    }}

Consumer.java

import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.DefaultConsumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.AMQP;public class Customer {    private final static String QUEUE_NAME = "rabbitMQ_test2";    public static void main(String[] args) throws IOException, TimeoutException {        // 创建连接工厂        ConnectionFactory factory = new ConnectionFactory();        //设置RabbitMQ地址        factory.setHost("100.51.15.10");        factory.setUsername("admin");        factory.setPassword("admin");        factory.setPort(5672);        //创建一个新的连接        Connection connection = factory.newConnection();        //创建一个通道        Channel channel = connection.createChannel();        //声明要关注的队列        channel.queueDeclare(QUEUE_NAME, false, false, false, null);        System.out.println("Customer Waiting Received messages");        //DefaultConsumer类实现了Consumer接口,通过传入一个频道,        // 告诉服务器我们需要那个频道的消息,如果频道中有消息,就会执行回调函数handleDelivery        Consumer consumer = new DefaultConsumer(channel) {            public void handleDelivery(String consumerTag, Envelope envelope,                                       AMQP.BasicProperties properties, byte[] body)                    throws IOException {                String message = new String(body, "UTF-8");                System.out.println("Customer Received '" + message + "'");            }        };        //自动回复队列应答 -- RabbitMQ中的消息确认机制        channel.basicConsume(QUEUE_NAME, true, consumer);    }}

执行

Producer.java

Producer Send +'Hello RabbitMQ'Producer Send +'Hello RabbitMQ'

Consumer.java

Customer Received 'Hello RabbitMQ'Customer Received 'Hello RabbitMQ'

例子2

首先写一个类,将产生产者和消费者统一为 EndPoint类型的队列。不管是生产者还是消费者,连接队列的代码都是一样的,这样可以通用一些。

EndPoint.java

//package co.syntx.examples.rabbitmq;import java.io.IOException;import java.util.concurrent.TimeoutException;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;/** * Represents a connection with a queue * @author syntx * */public abstract class EndPoint{    protected Channel channel;    protected Connection connection;    protected String endPointName;    public EndPoint(String endpointName) throws IOException{        this.endPointName = endpointName;        //Create a connection factory        ConnectionFactory factory = new ConnectionFactory();        //hostname of your rabbitmq server        factory.setHost("100.51.15.10");        factory.setUsername("admin");        factory.setPassword("admin");        factory.setPort(5672);        //getting a connection        try{            connection = factory.newConnection();        }catch (TimeoutException ex) {            System.out.println(ex);            connection = null;        }        //creating a channel        channel = connection.createChannel();        //declaring a queue for this channel. If queue does not exist,        //it will be created on the server.        channel.queueDeclare(endpointName, false, false, false, null);    }    /**     * 关闭channel和connection。并非必须,因为隐含是自动调用的。     * @throws IOException     */    public void close() throws IOException{        try{            this.channel.close();        } catch (TimeoutException ex){            System.out.println("ex" + ex);        }        this.connection.close();    }}

Producer2.java

import java.io.IOException;import java.io.Serializable;import org.apache.commons.lang.SerializationUtils;public class Producer2 extends EndPoint{    public Producer2(String endPointName) throws IOException{        super(endPointName);    }    public void sendMessage(Serializable object) throws IOException {        channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));    }}

QueueConsumer.java

import java.io.IOException;import java.util.HashMap;import java.util.Map;import org.apache.commons.lang.SerializationUtils;import com.rabbitmq.client.AMQP.BasicProperties;import com.rabbitmq.client.Consumer;import com.rabbitmq.client.Envelope;import com.rabbitmq.client.ShutdownSignalException;public class QueueConsumer extends EndPoint implements Runnable, Consumer{    public QueueConsumer(String endPointName) throws IOException{        super(endPointName);    }    public void run() {        try {            //start consuming messages. Auto acknowledge messages.            channel.basicConsume(endPointName, true,this);        } catch (IOException e) {            e.printStackTrace();        }    }    /**     * Called when consumer is registered.     */    public void handleConsumeOk(String consumerTag) {        System.out.println("Consumer "+consumerTag +" registered");    }    /**     * Called when new message is available.     */    public void handleDelivery(String consumerTag, Envelope env,                               BasicProperties props, byte[] body) throws IOException {        Map map = (HashMap)SerializationUtils.deserialize(body);        System.out.println("Message Number "+ map.get("message number") + " received.");    }    public void handleCancel(String consumerTag) {}    public void handleCancelOk(String consumerTag) {}    public void handleRecoverOk(String consumerTag) {}    public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}}

Main.java

import java.io.IOException;import java.sql.SQLException;import java.util.HashMap;public class Main {    public Main() throws Exception{        QueueConsumer consumer = new QueueConsumer("queue");        Thread consumerThread = new Thread(consumer);        consumerThread.start();        Producer2 producer = new Producer2("queue");        for (int i = 0; i < 5; i++) {            HashMap message = new HashMap();            message.put("message number", i);            producer.sendMessage(message);            System.out.println("Message Number "+ i +" sent.");        }    }    public static void main(String[] args) throws Exception{        new Main();        System.out.println("##############end...");    }}

 

转载地址:http://tyqbo.baihongyu.com/

你可能感兴趣的文章
virtual PC 打造IE6、IE7、IE8、IE9等多版本共存原版测试环境
查看>>
js面向对象1
查看>>
内部类
查看>>
高速数论变换(NTT)
查看>>
Springmvc的跳转方式
查看>>
加密原理介绍,代码实现DES、AES、RSA、Base64、MD5
查看>>
LINUX中常用操作命令
查看>>
python 获取进程pid号
查看>>
链表中插入一个节点的三种情况
查看>>
洛谷.4180.[模板]次小生成树Tree(Kruskal LCA 倍增)
查看>>
TCL函数“参数自动补全” 与 “help 信息显示”
查看>>
POJ1050To the Max
查看>>
汇编基础--标识符、标号、伪指令和指令
查看>>
Linux软中断、tasklet和工作队列
查看>>
如何解决ORA-28002 the password will expire within 7 days问题(密码快过期)
查看>>
Asp.Net Core 轻松学-利用日志监视进行服务遥测
查看>>
LightSwitch社区资源搜集
查看>>
Android通讯录查询篇--ContactsContract.Data 二(续)
查看>>
IT人的自我导向型学习:开篇杂谈
查看>>
[原创]BizTalk动手实验系列目录
查看>>