当前位置: 首页 > news >正文

信创产品TongLinkQ安装及springboot2整合使用

安装

参考1
参考2

上传

在这里插入图片描述
上传到/home/tlq目录下
在这里插入图片描述
解压,将license.dat拷贝到TLQ8中
在这里插入图片描述

创建用户

sudo useradd tlq 新建用户
sudo passwd tlq 设置密码
su tlq (在tql用户下操作)

设置环境变量

 cd /home/tlq/TLQ8/cat ./setp >> ~/.bash_profile生效
source ~/.bash_profile查看
env|grep TLQ

启动

tlq        #启动TongLink/Q
tlq  -cstop   #  停止 TongLink/Q 
tlq  -cabort   #强制停止服务

springboot2集成

解压后的目录中存在sample示例代码
在这里插入图片描述
集成代码
消息发送

package com.justai.icp.sms.config;import com.tongtech.tmqi.QueueConnectionFactory;import javax.jms.*;public class QueueSenderNoJNDI {public static void main(String[] args) {//==发送消息的目的队列String queName = "lq";//==连接工厂类QueueConnectionFactory queueConnectionFactory = null;//==连接类QueueConnection queueConnection = null;//==会话类QueueSession queueSession = null;//==打开的队列Queue queue = null;//==生产者QueueSender queueSender = null;try {//==创建连接工厂对象,并设置服务器地址信息,如果应用和TLQ服务端不在同一台机器上,请使用实际的服务端IP和Port替代下方的127.0.0.1和10024queueConnectionFactory = new QueueConnectionFactory();queueConnectionFactory.setProperty("tmqiAddressList", "tlkq://10.10.12.85:10024");//==创建Connection和SessionqueueConnection = queueConnectionFactory.createQueueConnection();queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);//==打开TLQ 队列和创建QueueSenderqueue = queueSession.createQueue(queName);queueSender = queueSession.createSender(queue);//==启动连接queueConnection.start();//==生成一个TEXT类型消息MapMessage mapMessage = queueSession.createMapMessage();System.out.println("发送消息...");mapMessage.setString("type","sms");mapMessage.setString("msg","哈哈哈");queueSender.send(mapMessage);System.out.println("发送完成...");} catch (Exception jmse) {System.out.println("Exception oxxurred :" + jmse);jmse.printStackTrace();} finally {try {if (queueSession != null) {//==关闭会话queueSession.close();}if (queueConnection != null) {//==关闭连接queueConnection.close();}} catch (Exception e) {System.out.println("退出时发生错误。");e.printStackTrace();}}}
}

消息接收

package com.justai.icp.sms.config;import com.tongtech.jms.FileMessage;
import com.tongtech.tmqi.QueueConnectionFactory;import javax.jms.*;public class QueueReceiverNoJNDI {public static void main(String[] args) {//==发送消息的目的队列String queName = "lq";//==连接工厂类QueueConnectionFactory queueConnectionFactory = null;//==连接类QueueConnection queueConnection = null;//==会话类QueueSession queueSession = null;//==打开的队列Queue queue = null;//==消费者QueueReceiver queueReceiver = null;try {//==创建连接工厂对象,并设置服务器地址信息,如果应用和TLQ服务端不在同一台机器上,请使用实际的服务端IP和Port替代下方的127.0.0.1和10024queueConnectionFactory = new QueueConnectionFactory();queueConnectionFactory.setProperty("tmqiAddressList", "tlkq://10.10.12.85:10024");//==创建Connection和SessionqueueConnection = queueConnectionFactory.createQueueConnection();queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);//==打开TLQ 队列和创建QueueReceiverqueue = queueSession.createQueue(queName);queueReceiver = queueSession.createReceiver(queue);//==启动连接queueConnection.start();//==接收消息,参数为超时时间,单位毫秒,如果不填写时间,则receive会无限期等待,直到接收到消息Message message = queueReceiver.receive(2000);//==判断接收到的消息的类型,并输出内容if (message != null) {if (message instanceof TextMessage) {TextMessage textMessage = (TextMessage)message;System.out.println("收到一条Text消息:" + textMessage.getText());} else if (message instanceof MapMessage) {System.out.println("收到一条Map消息");} else if (message instanceof StreamMessage) {System.out.println("收到一条Text消息");} else if (message instanceof BytesMessage) {System.out.println("收到一条Bytes消息");} else if (message instanceof ObjectMessage) {System.out.println("收到一条Object消息");} else if (message instanceof FileMessage) {System.out.println("收到一条文件消息");}} else {System.out.println("没有收到消息");}} catch (Exception jmse) {System.out.println("Exception oxxurred :" + jmse.toString());jmse.printStackTrace();} finally {try {if (queueSession != null) {//==关闭会话queueSession.close();}if (queueConnection != null) {//==关闭连接queueConnection.close();}} catch (Exception e) {System.out.println("退出时发生错误。");e.printStackTrace();}}}
}

消息异步接收

package com.justai.icp.sms.config;import com.tongtech.jms.FileMessage;import javax.jms.*;
import java.util.Properties;public class QueueReceiverAsynNewJNDI {public static final String tcf = "tongtech.jms.jndi.JmsContextFactory";/* initial context factory*/public static final String remoteURL = "tlq://10.10.12.85:10024";public static final String remoteFactory = "RemoteConnectionFactory";public static void main(String[] args) {ConnectionFactory testConnFactory = null;Connection myConn = null;Session mySession = null;Queue testQueue = null;MessageConsumer testConsumer = null;try {//==设置TLQ JMS连接工厂Properties pro = new Properties();pro.put("java.naming.factory.initial", tcf);//==设置JNDI服务器URL,如果本应用程序例子和JNDI服务器不在一起,请用JNDI服务器IP地址取代127.0.0.1;//==同时要修改服务端配置tlqjndi.conf中RemoteConnectionFactory的tmqiAddressList修改为JMS服务器IPpro.setProperty("java.naming.provider.url", remoteURL);//==查找TLQ JMS连接工厂和消息队列javax.naming.Context ctx = new javax.naming.InitialContext(pro);//查找指定名称的连接工厂信息,RemoteConnectionFactory对应的是TLQ服务节点tlqjndi.conf中[JndiSystem]->[Factory]->FactoryName配置项testConnFactory = (ConnectionFactory) ctx.lookup(remoteFactory);//查找指定的JNDI队列名称,this.pQueName对应的是TLQ服务节点tlqjndi.conf中[JndiSystem]->[JndiQueue]->JndiQueueName配置项testQueue = (Queue) ctx.lookup("MyQueue");//==创建Connection,此时并不能立即开始收发消息,必须调用start启动连接,Connection是线程安全的myConn = testConnFactory.createConnection();//创建会话,此会话非线程安全,所以一个会话只能用在一个线程中mySession = myConn.createSession(false, Session.AUTO_ACKNOWLEDGE);//==打开TLQ 队列和创建消费者testConsumer = mySession.createConsumer(testQueue);//==创建并设置监听器MessageRec receiver = new MessageRec();testConsumer.setMessageListener(receiver);//==启动连接myConn.start();synchronized (receiver) {receiver.wait();}} catch (Exception jmse) {System.out.println("Exception oxxurred :" + jmse.toString());jmse.printStackTrace();} finally {try {if (mySession != null) {//==关闭会话mySession.close();}if (myConn != null) {//==关闭连接myConn.close();}} catch (Exception e) {System.out.println("退出时发生错误。");e.printStackTrace();}}}}	//==监听器类,重写了onMessage方法,在onMessage方法中实现业务处理
class MessageRec implements javax.jms.MessageListener{public void onMessage(Message message) {try{//==判断接收到的消息的类型,并输出内容if(message instanceof TextMessage){TextMessage textMessage = (TextMessage)message;System.out.println("收到一条Text消息:"+textMessage.getText());}else if(message instanceof MapMessage ){MapMessage mapMessage = (MapMessage)message;System.out.println("收到一条Map消息"+mapMessage.getString("type"));}else if(message instanceof StreamMessage ){System.out.println("收到一条Text消息");}else if(message instanceof BytesMessage ){System.out.println("收到一条Bytes消息");}else if(message instanceof ObjectMessage ){System.out.println("收到一条Object消息");}else if(message instanceof FileMessage ){System.out.println("收到一条File消息");}
//				synchronized (this) {
//					notify();
//				}}catch(Exception jmse){System.out.println("Exception oxxurred :" + jmse.toString());jmse.printStackTrace();}}
}

springboot2异步监听

package com.justai.icp.sms.consumer;import com.alibaba.fastjson.JSON;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.justai.icp.common.core.domain.R;
import com.justai.icp.common.domain.system.domain.SysMessageUser;
import com.justai.icp.common.domain.system.domain.SysUser;
import com.justai.icp.common.redis.service.RedisService;
import com.justai.icp.feign.remote.system.RemoteUserService;
import com.justai.icp.sms.dto.SysMsgDto;
import com.justai.icp.sms.enums.MessageStatusEnum;
import com.justai.icp.sms.service.IMessageDataService;
import com.justai.icp.sms.utils.CMSSmsUtil;
import com.tongtech.jms.FileMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.jms.*;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;import static com.justai.icp.common.domain.system.enums.MessageTypeEnum.*;@Slf4j
@Component
public class MsgConsumer {private final ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("thread-call-runner-%d").build();@Autowiredprivate IMessageDataService messageDataService;@Autowiredprivate RedisService redisService;@Autowiredprivate RemoteUserService remoteUserService;@Value("${tlq.remoteServer}")private String tlqRemoteServer;private static final String TCF = "tongtech.jms.jndi.JmsContextFactory";private static final String REMOTE_URL = "tlq://";private static final String REMOTE_FACTORY = "RemoteConnectionFactory";private static final String QUEUE_NAME = "MyQueue";private Connection connection;com.tongtech.tmqi.QueueConnectionFactory queueConnectionFactory = null;//==连接类QueueConnection queueConnection = null;//==会话类QueueSession queueSession = null;//==打开的队列Queue queue = null;//==消费者QueueReceiver queueReceiver = null;/*** 容器启动后自动执行*/@PostConstructpublic void startListener() {try {queueConnectionFactory = new com.tongtech.tmqi.QueueConnectionFactory();queueConnectionFactory.setProperty("tmqiAddressList", "tlkq://10.10.12.85:10024");//==创建Connection和SessionqueueConnection = queueConnectionFactory.createQueueConnection();queueSession = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);Queue queue = queueSession.createQueue("lq");connection = queueConnectionFactory.createConnection();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);MessageConsumer consumer = session.createConsumer(queue);// 注册监听consumer.setMessageListener(new MessageListenerImpl());connection.start();log.info("TongLinkQ 消息监听器已启动,监听队列:{}", QUEUE_NAME);} catch (Exception e) {e.printStackTrace();log.error("启动 TongLinkQ 监听器失败", e);}}/*** 容器关闭时释放连接*/@PreDestroypublic void stop() {try {if (connection != null) {connection.close();log.info("TongLinkQ 连接已关闭");}} catch (JMSException e) {log.error("关闭 TongLinkQ 连接异常", e);}}/*** 真正的消息处理类*/private class MessageListenerImpl implements MessageListener {@Overridepublic void onMessage(Message message) {try {if (message instanceof TextMessage) {log.info("收到 Text 消息: {}", ((TextMessage) message).getText());} else if (message instanceof MapMessage) {String type = ((MapMessage) message).getString("type");String msg = ((MapMessage) message).getString("msg");String msgId = ((MapMessage) message).getString("msgId");log.info("收到 Map 消息: {}", msg);dealMsg(type,msg);} else if (message instanceof FileMessage) {log.info("收到 File 消息");} else {log.info("收到其他类型消息: {}", message.getClass().getSimpleName());}} catch (JMSException e) {log.error("处理消息异常", e);}}}private void dealMsg(String type,String msg){if (type.equals(String.valueOf(INTERNAL))){}else if (type.equals(String.valueOf(MAIL))){}else if (type.equals(String.valueOf(SMS))){dealSmsMsgForYC(msg);}else {}}public void dealSmsMsgForYC(String msg){System.out.println("deal sms msg");log.info("deal sms msg");try {SysMsgDto msgDto = JSON.parseObject(msg, SysMsgDto.class);String redisKey = "msglock:" + msgDto.getId();//幂等性处理,避免重复消费Boolean lock = redisService.redisTemplate.opsForValue().setIfAbsent(redisKey, Duration.ofSeconds(300));if (Boolean.TRUE.equals(lock)){System.out.println("进入分布式锁");try{//获取消息状态 (推送状态:1未处理;2已推送;3已入队;4已完成;5异常)int pushStatus = messageDataService.getMsgPushStatus(msgDto.getId());System.out.println("判断状态"+pushStatus);if (pushStatus != MessageStatusEnum.COMPLETED.getStatus()){//获取消息-用户List<SysMessageUser> messageUsers = getSysMessageUsers(msgDto);//获取用户信息列表R<List<SysUser>> listR = remoteUserService.getUserListByIds(new HashSet<>(msgDto.getReceiveUser()));System.out.println(JSON.toJSONString(listR));log.info("get user list:{}",JSON.toJSONString(listR));if (listR.isSuccess()){List<SysUser> userList = listR.getData();//收集邮箱列表List<String> phoneList = userList.parallelStream().map(SysUser::getPhonenumber).collect(Collectors.toList());//发送邮件for (String phone : phoneList){String content = msgDto.getMessageData();// 发送短信int result = 0;try{result = CMSSmsUtil.sendBatchMessage(phone, content);} catch (IOException e){throw new RuntimeException(e);}// 根据短信发送状态返回参数if( result == 1 ){System.out.println("手机号码" +phone + "短信发送成功!");log.info("手机号码" +phone + "短信发送成功!");} else {System.out.println("手机号码" +phone + "短信发送失败!");log.error("手机号码" +phone + "短信发送失败,请稍后再试!");}}}System.out.println("发送完成");//保存messageDataService.saveMsgUserBatch(messageUsers);//修改消息状态messageDataService.updateMsgStatus(String.valueOf(msgDto.getId()),MessageStatusEnum.COMPLETED);}}catch (Exception e){log.error(e.getMessage());e.printStackTrace();}finally{//释放锁redisService.deleteObject(redisKey);}}} catch (Exception e) {e.printStackTrace();//手动应答log.error("邮件发送失败:" + e.getMessage());}}/*** 收集数据* @param msgDto 消息对象* @return 消息用户集合*/private List<SysMessageUser> getSysMessageUsers(SysMsgDto msgDto){List<String> receiveUserList = msgDto.getReceiveUser();//收集要发送的用户信息List<SysMessageUser> messageUsers = new ArrayList<>();for (String receiveUserId : receiveUserList){SysMessageUser messageUser = new SysMessageUser();messageUser.setMessageId(msgDto.getId());messageUser.setUserId(receiveUserId);messageUser.setMsgType(msgDto.getMessageType());messageUser.setStatus("1");messageUsers.add(messageUser);}return messageUsers;}
}
http://www.dtcms.com/a/340480.html

相关文章:

  • AI 视频翻译工具的调研笔记
  • Spring Boot 实战:从项目搭建到部署优化
  • Causal-Copilot: An Autonomous Causal Analysis Agent 论文解读
  • 基于离散点集的三次样条插值与符号表达式构建:从 Scipy 到 Sympy 的完整实现
  • 记一次前端Vue3+Vite+TS项目中使用Mock.js + vite-plugin-mock插件发布到生产(线上)环境填坑汇总
  • 矩阵的特征分解
  • C语言---分隔符、常量、注释、标识符、关键字、空格
  • LoRa 网关组网方案(二)
  • 【科研绘图系列】R语言绘制平行坐标图
  • 保姆级Debezium抽取SQL Server同步kafka
  • 绕过 C 标准库限制执行系统命令:系统调用、Shellcode 和裸机二进制
  • week2-[一维数组]出现次数
  • css中的性能优化之content-visibility: auto
  • InfluxDB 查询性能优化实战(二)
  • 【解决方案】powershell自动连接夜神adb端口
  • 手撕线程池
  • AI 伦理的 “灰色地带”:当算法拥有决策权,公平与隐私该如何平衡?
  • C# NX二次开发:面收集器控件和曲线收集器控件详解
  • 边缘智能体:Go编译在医疗IoT设备端运行轻量AI模型(下)
  • DAY 51 复习日
  • Redis 复制功能是如何工作的
  • Android 开发问题:android:marginTop=“20px“ 属性不生效
  • 多系统 Node.js 环境自动化部署脚本:从 Ubuntu 到 CentOS,再到版本自由定制
  • 云原生俱乐部-k8s知识点归纳(5)
  • 自动化测试用例生成:基于Python的参数化测试框架设计与实现
  • MeterSphere断言操作
  • 多肽修饰——胆固醇(chol)
  • B站 XMCVE Pwn入门课程学习笔记(7)
  • sigmastar设备树引脚复用研究
  • 《GPT-OSS 模型全解析:OpenAI 回归开源的 Mixture-of-Experts 之路》