- 配置文件
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQConnectionUtilLocal {public static Connection getConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("guest");factory.setPassword("guest");factory.setAutomaticRecoveryEnabled(true); return factory.newConnection();}
}
- direct生产端
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class RoutingProducer {private final static String EXCHANGE_NAME = "direct_exchange";public static void main(String[] argv) throws Exception {Connection connection = RabbitMQConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");String message1 = "{\"flag\":0,\"idList\":[\"1\"]}";String message2 = "Info log message";String message3 = "Warning log message";channel.basicPublish(EXCHANGE_NAME, "error", null, message1.getBytes());System.out.println(" [x] Sent '" + message1 + "'");channel.basicPublish(EXCHANGE_NAME, "info", null, message2.getBytes());System.out.println(" [x] Sent '" + message2 + "'");channel.basicPublish(EXCHANGE_NAME, "warning", null, message3.getBytes());System.out.println(" [x] Sent '" + message3 + "'");channel.close();connection.close();}
}
- fanout生产端
import com.rabbitmq.client.*;
import jzy.util.RabbitMQConnectionUtil;public class FanoutExchangeExample {private static final String EXCHANGE_NAME = "20250917test1";public static void main(String[] argv) throws Exception {Connection connection = RabbitMQConnectionUtilLocal.getConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);String queueName = channel.queueDeclare().getQueue();channel.queueBind(queueName, EXCHANGE_NAME, "");String message = "This is a log message";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();}
}
- 消费端
package jzy.util;import cn.hutool.json.JSONArray;
import cn.hutool.json.JSONObject;
import com.rabbitmq.client.*;
import org.springframework.web.bind.annotation.RequestBody;import java.util.List;public class RabbitMQConsumer {
private final static String EXCHANGE_NAME = "position_exchange";private final static String QUEUE_NAME = "LZF0101_queue";public static void main(String[] argv) throws Exception {Connection connection = RabbitMQConnectionUtil.getConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "LZF0101_topic");DefaultConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws java.io.IOException {String message = new String(body, "UTF-8");JSONObject jsonObject = JsonUtil.getJsonToBean(replaceNbspWithSpace(message), JSONObject.class);try {switch (jsonObject.getStr("flag")) {default:break;}} catch (Exception e) {throw new RuntimeException(e);}System.out.println(" [x] Received Log: '" + message + "'");}};channel.basicConsume(QUEUE_NAME, true, consumer);}public static String replaceNbspWithSpace(String str) {if (str == null) {return null;}return str.replaceAll("\\u00A0", " ");}
}