信创产品TongLinkQ安装及springboot2整合使用
安装
参考1
参考2
上传
上传到/home/tlq目录下
解压,将license.dat拷贝到TLQ8中
创建用户
sudo useradd tlq 新建用户
sudo passwd tlq 设置密码
su tlq (在tql用户下操作)
设置环境变量
cd /home/tlq/TLQ8/cat ./setp >> ~/.bash_profile生效
source ~/.bash_profile查看
env|grep TLQ
启动
tlq #启动TongLink/Q
tlq -cstop # 停止 TongLink/Q
tlq -cabort #强制停止服务
springboot2集成
解压后的目录中存在sample示例代码
集成代码
消息发送
package com.justai.icp.sms.config;import com.tongtech.tmqi.QueueConnectionFactory;import javax.jms.*;public class QueueSenderNoJNDI {public static void main(String[] args) {//==发送消息的目的队列String queName = "lq";//==连接工厂类QueueConnectionFactory queueConnectionFactory = null;//==连接类QueueConnection queueConnection = null;//==会话类QueueSession queueSession = null;//==打开的队列Queue queue = null;//==生产者QueueSender queueSender = null;try {//==创建连接工厂对象,并设置服务器地址信息,如果应用和TLQ服务端不在同一台机器上,请使用实际的服务端IP和Port替代下方的127.0.0.1和10024queueConnectionFactory = new QueueConnectionFactory();queueConnectionFactory.setProperty("tmqiAddressList", "tlkq://10.10.12.85:10024");//==创建Connection和SessionqueueConnection = queueConnectionFactory.createQueueConnection();queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);//==打开TLQ 队列和创建QueueSenderqueue = queueSession.createQueue(queName);queueSender = queueSession.createSender(queue);//==启动连接queueConnection.start();//==生成一个TEXT类型消息MapMessage mapMessage = queueSession.createMapMessage();System.out.println("发送消息...");mapMessage.setString("type","sms");mapMessage.setString("msg","哈哈哈");queueSender.send(mapMessage);System.out.println("发送完成...");} catch (Exception jmse) {System.out.println("Exception oxxurred :" + jmse);jmse.printStackTrace();} finally {try {if (queueSession != null) {//==关闭会话queueSession.close();}if (queueConnection != null) {//==关闭连接queueConnection.close();}} catch (Exception e) {System.out.println("退出时发生错误。");e.printStackTrace();}}}
}
消息接收
package com.justai.icp.sms.config;import com.tongtech.jms.FileMessage;
import com.tongtech.tmqi.QueueConnectionFactory;import javax.jms.*;public class QueueReceiverNoJNDI {public static void main(String[] args) {//==发送消息的目的队列String queName = "lq";//==连接工厂类QueueConnectionFactory queueConnectionFactory = null;//==连接类QueueConnection queueConnection = null;//==会话类QueueSession queueSession = null;//==打开的队列Queue queue = null;//==消费者QueueReceiver queueReceiver = null;try {//==创建连接工厂对象,并设置服务器地址信息,如果应用和TLQ服务端不在同一台机器上,请使用实际的服务端IP和Port替代下方的127.0.0.1和10024queueConnectionFactory = new QueueConnectionFactory();queueConnectionFactory.setProperty("tmqiAddressList", "tlkq://10.10.12.85:10024");//==创建Connection和SessionqueueConnection = queueConnectionFactory.createQueueConnection();queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);//==打开TLQ 队列和创建QueueReceiverqueue = queueSession.createQueue(queName);queueReceiver = queueSession.createReceiver(queue);//==启动连接queueConnection.start();//==接收消息,参数为超时时间,单位毫秒,如果不填写时间,则receive会无限期等待,直到接收到消息Message message = queueReceiver.receive(2000);//==判断接收到的消息的类型,并输出内容if (message != null) {if (message instanceof TextMessage) {TextMessage textMessage = (TextMessage)message;System.out.println("收到一条Text消息:" + textMessage.getText());} else if (message instanceof MapMessage) {System.out.println("收到一条Map消息");} else if (message instanceof StreamMessage) {System.out.println("收到一条Text消息");} else if (message instanceof BytesMessage) {System.out.println("收到一条Bytes消息");} else if (message instanceof ObjectMessage) {System.out.println("收到一条Object消息");} else if (message instanceof FileMessage) {System.out.println("收到一条文件消息");}} else {System.out.println("没有收到消息");}} catch (Exception jmse) {System.out.println("Exception oxxurred :" + jmse.toString());jmse.printStackTrace();} finally {try {if (queueSession != null) {//==关闭会话queueSession.close();}if (queueConnection != null) {//==关闭连接queueConnection.close();}} catch (Exception e) {System.out.println("退出时发生错误。");e.printStackTrace();}}}
}
消息异步接收
package com.justai.icp.sms.config;import com.tongtech.jms.FileMessage;import javax.jms.*;
import java.util.Properties;public class QueueReceiverAsynNewJNDI {public static final String tcf = "tongtech.jms.jndi.JmsContextFactory";/* initial context factory*/public static final String remoteURL = "tlq://10.10.12.85:10024";public static final String remoteFactory = "RemoteConnectionFactory";public static void main(String[] args) {ConnectionFactory testConnFactory = null;Connection myConn = null;Session mySession = null;Queue testQueue = null;MessageConsumer testConsumer = null;try {//==设置TLQ JMS连接工厂Properties pro = new Properties();pro.put("java.naming.factory.initial", tcf);//==设置JNDI服务器URL,如果本应用程序例子和JNDI服务器不在一起,请用JNDI服务器IP地址取代127.0.0.1;//==同时要修改服务端配置tlqjndi.conf中RemoteConnectionFactory的tmqiAddressList修改为JMS服务器IPpro.setProperty("java.naming.provider.url", remoteURL);//==查找TLQ JMS连接工厂和消息队列javax.naming.Context ctx = new javax.naming.InitialContext(pro);//查找指定名称的连接工厂信息,RemoteConnectionFactory对应的是TLQ服务节点tlqjndi.conf中[JndiSystem]->[Factory]->FactoryName配置项testConnFactory = (ConnectionFactory) ctx.lookup(remoteFactory);//查找指定的JNDI队列名称,this.pQueName对应的是TLQ服务节点tlqjndi.conf中[JndiSystem]->[JndiQueue]->JndiQueueName配置项testQueue = (Queue) ctx.lookup("MyQueue");//==创建Connection,此时并不能立即开始收发消息,必须调用start启动连接,Connection是线程安全的myConn = testConnFactory.createConnection();//创建会话,此会话非线程安全,所以一个会话只能用在一个线程中mySession = myConn.createSession(false, Session.AUTO_ACKNOWLEDGE);//==打开TLQ 队列和创建消费者testConsumer = mySession.createConsumer(testQueue);//==创建并设置监听器MessageRec receiver = new MessageRec();testConsumer.setMessageListener(receiver);//==启动连接myConn.start();synchronized (receiver) {receiver.wait();}} catch (Exception jmse) {System.out.println("Exception oxxurred :" + jmse.toString());jmse.printStackTrace();} finally {try {if (mySession != null) {//==关闭会话mySession.close();}if (myConn != null) {//==关闭连接myConn.close();}} catch (Exception e) {System.out.println("退出时发生错误。");e.printStackTrace();}}}} //==监听器类,重写了onMessage方法,在onMessage方法中实现业务处理
class MessageRec implements javax.jms.MessageListener{public void onMessage(Message message) {try{//==判断接收到的消息的类型,并输出内容if(message instanceof TextMessage){TextMessage textMessage = (TextMessage)message;System.out.println("收到一条Text消息:"+textMessage.getText());}else if(message instanceof MapMessage ){MapMessage mapMessage = (MapMessage)message;System.out.println("收到一条Map消息"+mapMessage.getString("type"));}else if(message instanceof StreamMessage ){System.out.println("收到一条Text消息");}else if(message instanceof BytesMessage ){System.out.println("收到一条Bytes消息");}else if(message instanceof ObjectMessage ){System.out.println("收到一条Object消息");}else if(message instanceof FileMessage ){System.out.println("收到一条File消息");}
// synchronized (this) {
// notify();
// }}catch(Exception jmse){System.out.println("Exception oxxurred :" + jmse.toString());jmse.printStackTrace();}}
}
springboot2异步监听
package com.justai.icp.sms.consumer;import com.alibaba.fastjson.JSON;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.justai.icp.common.core.domain.R;
import com.justai.icp.common.domain.system.domain.SysMessageUser;
import com.justai.icp.common.domain.system.domain.SysUser;
import com.justai.icp.common.redis.service.RedisService;
import com.justai.icp.feign.remote.system.RemoteUserService;
import com.justai.icp.sms.dto.SysMsgDto;
import com.justai.icp.sms.enums.MessageStatusEnum;
import com.justai.icp.sms.service.IMessageDataService;
import com.justai.icp.sms.utils.CMSSmsUtil;
import com.tongtech.jms.FileMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.*;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;import static com.justai.icp.common.domain.system.enums.MessageTypeEnum.*;@Slf4j
@Component
public class MsgConsumer {private final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build();@Autowiredprivate IMessageDataService messageDataService;@Autowiredprivate RedisService redisService;@Autowiredprivate RemoteUserService remoteUserService;@Value("${tlq.remoteServer}")private String tlqRemoteServer;private static final String TCF = "tongtech.jms.jndi.JmsContextFactory";private static final String REMOTE_URL = "tlq://";private static final String REMOTE_FACTORY = "RemoteConnectionFactory";private static final String QUEUE_NAME = "MyQueue";private Connection connection;com.tongtech.tmqi.QueueConnectionFactory queueConnectionFactory = null;//==连接类QueueConnection queueConnection = null;//==会话类QueueSession queueSession = null;//==打开的队列Queue queue = null;//==消费者QueueReceiver queueReceiver = null;/*** 容器启动后自动执行*/@PostConstructpublic void startListener() {try {queueConnectionFactory = new com.tongtech.tmqi.QueueConnectionFactory();queueConnectionFactory.setProperty("tmqiAddressList", "tlkq://10.10.12.85:10024");//==创建Connection和SessionqueueConnection = queueConnectionFactory.createQueueConnection();queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);Queue queue = queueSession.createQueue("lq");connection = queueConnectionFactory.createConnection();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);MessageConsumer consumer = session.createConsumer(queue);// 注册监听consumer.setMessageListener(new MessageListenerImpl());connection.start();log.info("TongLinkQ 消息监听器已启动,监听队列:{}", QUEUE_NAME);} catch (Exception e) {e.printStackTrace();log.error("启动 TongLinkQ 监听器失败", e);}}/*** 容器关闭时释放连接*/@PreDestroypublic void stop() {try {if (connection != null) {connection.close();log.info("TongLinkQ 连接已关闭");}} catch (JMSException e) {log.error("关闭 TongLinkQ 连接异常", e);}}/*** 真正的消息处理类*/private class MessageListenerImpl implements MessageListener {@Overridepublic void onMessage(Message message) {try {if (message instanceof TextMessage) {log.info("收到 Text 消息: {}", ((TextMessage) message).getText());} else if (message instanceof MapMessage) {String type = ((MapMessage) message).getString("type");String msg = ((MapMessage) message).getString("msg");String msgId = ((MapMessage) message).getString("msgId");log.info("收到 Map 消息: {}", msg);dealMsg(type,msg);} else if (message instanceof FileMessage) {log.info("收到 File 消息");} else {log.info("收到其他类型消息: {}", message.getClass().getSimpleName());}} catch (JMSException e) {log.error("处理消息异常", e);}}}private void dealMsg(String type,String msg){if (type.equals(String.valueOf(INTERNAL))){}else if (type.equals(String.valueOf(MAIL))){}else if (type.equals(String.valueOf(SMS))){dealSmsMsgForYC(msg);}else {}}public void dealSmsMsgForYC(String msg){System.out.println("deal sms msg");log.info("deal sms msg");try {SysMsgDto msgDto = JSON.parseObject(msg, SysMsgDto.class);String redisKey = "msglock:" + msgDto.getId();//幂等性处理,避免重复消费Boolean lock = redisService.redisTemplate.opsForValue().setIfAbsent(redisKey, Duration.ofSeconds(300));if (Boolean.TRUE.equals(lock)){System.out.println("进入分布式锁");try{//获取消息状态 (推送状态:1未处理;2已推送;3已入队;4已完成;5异常)int pushStatus = messageDataService.getMsgPushStatus(msgDto.getId());System.out.println("判断状态"+pushStatus);if (pushStatus != MessageStatusEnum.COMPLETED.getStatus()){//获取消息-用户List<SysMessageUser> messageUsers = getSysMessageUsers(msgDto);//获取用户信息列表R<List<SysUser>> listR = remoteUserService.getUserListByIds(new HashSet<>(msgDto.getReceiveUser()));System.out.println(JSON.toJSONString(listR));log.info("get user list:{}",JSON.toJSONString(listR));if (listR.isSuccess()){List<SysUser> userList = listR.getData();//收集邮箱列表List<String> phoneList = userList.parallelStream().map(SysUser::getPhonenumber).collect(Collectors.toList());//发送邮件for (String phone : phoneList){String content = msgDto.getMessageData();// 发送短信int result = 0;try{result = CMSSmsUtil.sendBatchMessage(phone, content);} catch (IOException e){throw new RuntimeException(e);}// 根据短信发送状态返回参数if( result == 1 ){System.out.println("手机号码" +phone + "短信发送成功!");log.info("手机号码" +phone + "短信发送成功!");} else {System.out.println("手机号码" +phone + "短信发送失败!");log.error("手机号码" +phone + "短信发送失败,请稍后再试!");}}}System.out.println("发送完成");//保存messageDataService.saveMsgUserBatch(messageUsers);//修改消息状态messageDataService.updateMsgStatus(String.valueOf(msgDto.getId()),MessageStatusEnum.COMPLETED);}}catch (Exception e){log.error(e.getMessage());e.printStackTrace();}finally{//释放锁redisService.deleteObject(redisKey);}}} catch (Exception e) {e.printStackTrace();//手动应答log.error("邮件发送失败:" + e.getMessage());}}/*** 收集数据* @param msgDto 消息对象* @return 消息用户集合*/private List<SysMessageUser> getSysMessageUsers(SysMsgDto msgDto){List<String> receiveUserList = msgDto.getReceiveUser();//收集要发送的用户信息List<SysMessageUser> messageUsers = new ArrayList<>();for (String receiveUserId : receiveUserList){SysMessageUser messageUser = new SysMessageUser();messageUser.setMessageId(msgDto.getId());messageUser.setUserId(receiveUserId);messageUser.setMsgType(msgDto.getMessageType());messageUser.setStatus("1");messageUsers.add(messageUser);}return messageUsers;}
}