当前位置: 首页 > wzjs >正文

怎么经营团购网站电商网站改版

怎么经营团购网站,电商网站改版,北京菜谱设计制作,建筑课程网站背景 因为安装了正向隔离网闸&#xff0c;导致数据传输的时候仅支持TCP协议和UDP协议&#xff0c;因此需要开发TCP Client和Server服务来将数据透传&#xff0c;当前环境是获取的数据并将数据转发到kafka 1.引入依赖 <dependency><groupId>io.netty</groupId>…

背景

因为安装了正向隔离网闸,导致数据传输的时候仅支持TCP协议和UDP协议,因此需要开发TCP Client和Server服务来将数据透传,当前环境是获取的数据并将数据转发到kafka

 1.引入依赖

<dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.84.Final</version>
</dependency>

2.编写TCP Server端 

TCP Server代码

本代码已经解决TCP的粘包和半包问题,需要通过固定的$符号进行数据分割,使得数据不会错出现粘包和半包问题,可以根据数据大小制定一个不会超过发送消息长度的值

 

package com.huanyu.forward.tcp.server;import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression;
import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;@Slf4j
@Service("tcpServer")
@ConditionalOnExpression("#{'${spring.tcp-server.port:}'.length()>0}")
public class TcpNettyServer {@Value("${spring.tcp-server.port:22222}")private Integer port;public static void main(String[] args) throws Exception {new TcpNettyServer().server(22222);}@PostConstruct()public void initTcpServer() {try {log.info("start tcp server......");server(port);} catch (Exception e) {log.error("tcp server start failed");}}public void server(int port) throws Exception {//bossGroup就是parentGroup,是负责处理TCP/IP连接的EventLoopGroup bossGroup = new NioEventLoopGroup();//workerGroup就是childGroup,是负责处理Channel(通道)的I/O事件EventLoopGroup workerGroup = new NioEventLoopGroup();ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(1, 1);buffer.writeByte('$');ServerBootstrap sb = new ServerBootstrap();//初始化服务端可连接队列,指定了队列的大小500sb.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100)//保持长连接.childOption(ChannelOption.SO_KEEPALIVE, true)// 绑定客户端连接时候触发操作.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel sh) throws Exception {//handler是按顺序执行的ChannelPipeline pipeline = sh.pipeline();//业务编码 -解决 数据粘包和半包问题-pipeline.addLast(new DelimiterBasedFrameDecoder(1024 * 1024 * 10, buffer));
//                        pipeline.addLast(new LoggingHandler(LogLevel.WARN));pipeline.addLast(new TcpBizFlagHandler());//业务编码//使用DataHandler类来处理接收到的消息pipeline.addLast(new TcpDataHandler());}});//绑定监听端口,调用sync同步阻塞方法等待绑定操作完ChannelFuture future = sb.bind(port).sync();if (future.isSuccess()) {log.info("tcp server is listening on  :{}", port);} else {log.error("tcp server is failed ", future.cause());//关闭线程组bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}//成功绑定到端口之后,给channel增加一个 管道关闭的监听器并同步阻塞,直到channel关闭,线程才会往下执行,结束进程。
//        future.channel().closeFuture().await();}
}

 数据标志位接收代码

package com.huanyu.forward.tcp.server;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;import java.nio.charset.StandardCharsets;
import java.util.List;@Slf4j
public class TcpBizFlagHandler extends ByteToMessageDecoder {public static final String BIZ_FLAG = "bizFlag";private static final String FLAG_PRE = "@@{";private static final String FLAG_SUF = "}##";private static final byte[] FLAG_PREFIX = FLAG_PRE.getBytes(StandardCharsets.UTF_8);private static final byte[] FLAG_SUFFIX = FLAG_SUF.getBytes(StandardCharsets.UTF_8);@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {if (in.readableBytes() < FLAG_PREFIX.length + FLAG_SUFFIX.length) {log.warn("数据长度不够");text(in);return;}int prefixIndex = in.readerIndex();if (!startsWith(in)) {text(in);// 忽略非标志位开头的数据in.skipBytes(in.readableBytes());log.warn("数据不包含指定的前缀");return;}int suffixIndex = indexOf(in);if (suffixIndex == -1) {log.warn("数据不包含指定的某字符");text(in);return;}int flagLength = suffixIndex - prefixIndex + FLAG_SUFFIX.length;byte[] flagBytes = new byte[flagLength];in.readBytes(flagBytes); // 读取标志位// 保留标志位的对象结构-以@@{开头以}##结尾,形如@@{"k":"v"}##{"k":"v"}$,@@和##之间的数据为补充的对象参数JSON,$为换行符号String flag = new String(flagBytes, FLAG_PRE.length() - 1, flagBytes.length - FLAG_PREFIX.length - FLAG_SUFFIX.length + 2, StandardCharsets.UTF_8);// 保存标志位到 Channel 属性中供后续使用ctx.channel().attr(AttributeKey.valueOf(BIZ_FLAG)).set(flag);// 剩余数据继续传递给下一个 Handler 处理(透传)out.add(in.readRetainedSlice(in.readableBytes()));}private static void text(ByteBuf in) {byte[] msgByte = new byte[in.readableBytes()];in.readBytes(msgByte);log.warn("数据:{}", new String(msgByte, StandardCharsets.UTF_8));}private boolean startsWith(ByteBuf buf) {for (int i = 0; i < TcpBizFlagHandler.FLAG_PREFIX.length; i++) {if (buf.getByte(buf.readerIndex() + i) != TcpBizFlagHandler.FLAG_PREFIX[i]) {return false;}}return true;}private int indexOf(ByteBuf buf) {int readerIndex = buf.readerIndex();int readableBytes = buf.readableBytes();for (int i = 0; i <= readableBytes - TcpBizFlagHandler.FLAG_SUFFIX.length; i++) {boolean match = true;for (int j = 0; j < TcpBizFlagHandler.FLAG_SUFFIX.length; j++) {if (buf.getByte(readerIndex + i + j) != TcpBizFlagHandler.FLAG_SUFFIX[j]) {match = false;break;}}if (match) {return readerIndex + i;}}return -1;}
}

业务转发/解析代码 

package com.huanyu.forward.tcp.server;import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.AttributeKey;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
import org.springframework.util.ObjectUtils;import static com.aimsphm.forward.tcp.server.TcpBizFlagHandler.BIZ_FLAG;@Slf4j
@Service
public class TcpDataHandler extends ChannelInboundHandlerAdapter {//    @Resourceprivate KafkaTemplate<String, Object> template;//接受client发送的消息@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {Channel channel = ctx.channel();// 获取标志位String flag = (String) channel.attr(AttributeKey.valueOf(BIZ_FLAG)).get();if (ObjectUtils.isEmpty(flag)) {log.warn("没有业务标识");return;}ByteBuf buf = (ByteBuf) msg;byte[] msgByte = new byte[buf.readableBytes()];buf.readBytes(msgByte);
//        template.send("haha.haha.ha", gbk.getBytes());log.info("bizFag:{},data: {}", flag, new String(msgByte));}//通知处理器最后的channelRead()是当前批处理中的最后一条消息时调用@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.flush();}//读操作时捕获到异常时调用@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {ctx.close();}//客户端去和服务端连接成功时触发@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {
//        ctx.writeAndFlush(Unpooled.copiedBuffer("hello client [你好,客户端]".getBytes()));log.info("client 连接成功: {}", ctx.channel());}
}

3.编写客户端代码

TCP Client 代码

package com.huanyu.forward.tcp.client;import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;import java.util.stream.IntStream;@Getter
@Slf4j
public class TcpNettyClient {public static void main(String[] args) {extracted();}private static void extracted() {try {TcpNettyClient client = new TcpNettyClient("localhost", 4444);Channel channel = client.getChannel();IntStream.range(0, 1000).parallel().forEach(i -> {ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();buf.writeBytes(("@@{\"cell-topic" + (i + 1) + "\":true}##{01#.01#\":\"data1\"}").getBytes());buf.writeByte('$');channel.writeAndFlush(buf);});} catch (Exception e) {log.error("出现异常:", e);}}private Channel channel;//连接服务端的端口号地址和端口号public TcpNettyClient(String host, int port) {tcpClient(host, port);}public void tcpClient(String host, int port) {try {final EventLoopGroup group = new NioEventLoopGroup();Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class)  // 使用NioSocketChannel来作为连接用的channel类.handler(new ChannelInitializer<SocketChannel>() { // 绑定连接初始化器@Overridepublic void initChannel(SocketChannel ch) throws Exception {System.out.println("正在连接中...");ChannelPipeline pipeline = ch.pipeline();pipeline.addLast(new TcpClientHandler()); //客户端处理类}});//发起异步连接请求,绑定连接端口和host信息final ChannelFuture future = b.connect(host, port).sync();future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture arg0) throws Exception {if (future.isSuccess()) {log.info("连接服务器成功:");} else {log.warn("连接服务器失败:");System.out.println("连接服务器失败");group.shutdownGracefully(); //关闭线程组}}});this.channel = future.channel();} catch (InterruptedException e) {log.error("TCP服务端启动异常:", e);}}}

 客户端数据解析代码

package com.huanyu.forward.tcp.client;import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;import java.util.Map;public class TcpClientHandler extends SimpleChannelInboundHandler<Map<String, ByteBuf>> {//处理服务端返回的数据@Overrideprotected void channelRead0(ChannelHandlerContext ctx, Map<String, ByteBuf> data) throws Exception {ByteBuf msg = data.get("topic");byte[] msgByte = new byte[msg.readableBytes()];msg.readBytes(msgByte);System.out.println("接受到server响应数据: " + new String(msgByte));}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {
//        ctx.writeAndFlush(Unpooled.copiedBuffer("hello server 你好".getBytes()));super.channelActive(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();}
}

 备注

1. 为了尽可能的降低性能消耗,数据以字节数组的形式发送

2. 业务字段通过@@{"key":"value"}##作为消息的头部,用数据标志位处理器进行处理

3. 真实要传送的数据,并不解析出来,并以$结尾,解决粘包和半包问题

记录备查


文章转载自:

http://HWGHKMlm.dmfdL.cn
http://9uvBONek.dmfdL.cn
http://K44iM5og.dmfdL.cn
http://mPBjnix4.dmfdL.cn
http://wieC07zE.dmfdL.cn
http://c56Wdwrm.dmfdL.cn
http://sL2aDhBp.dmfdL.cn
http://5IpdGn7A.dmfdL.cn
http://I9xZ7Lp5.dmfdL.cn
http://rnu50n3E.dmfdL.cn
http://65Pie4Fu.dmfdL.cn
http://xIInZEse.dmfdL.cn
http://1ojWEfTx.dmfdL.cn
http://JBYcDv12.dmfdL.cn
http://a17D1VdS.dmfdL.cn
http://WX5lF2oG.dmfdL.cn
http://2cpMFS6y.dmfdL.cn
http://TYaWDf5h.dmfdL.cn
http://RfgJ6O8D.dmfdL.cn
http://aiAoGrfW.dmfdL.cn
http://zVbL4GoE.dmfdL.cn
http://b5tnJlLj.dmfdL.cn
http://eqJ4dOaU.dmfdL.cn
http://zswGD8NO.dmfdL.cn
http://URbvvYxN.dmfdL.cn
http://OB9h75jg.dmfdL.cn
http://GhZeIK0b.dmfdL.cn
http://Fvgc0SsZ.dmfdL.cn
http://iqI0BTRE.dmfdL.cn
http://FBgRJYNx.dmfdL.cn
http://www.dtcms.com/wzjs/689965.html

相关文章:

  • 58同城网站建设目的个人养老缴费明细查询
  • asp和php网站的区别小语言网站建设
  • 绝味鸭脖网站建设规划书宿州商务网站建设
  • 网站备案网站名称网站开发专业就业前景分析
  • 宁波住房和建设局网站怎么做蛋糕
  • 微网站难做么微信怎么推广
  • 文化传媒公司 网站备案跨境电商无货源模式怎么做
  • 郑州seo网站推广桂林市建设局网站
  • 怎样成立网站dw软件怎么用怎么做网页
  • 哪里可以做拍卖网站最新新闻热点国家大事
  • 网站备案用的幕布可以淘宝做吗网站显示系统建设中
  • 阿里云对象存储做静态网站网站建设 名词解释
  • 视频网站开发与制作wordpress 改成 中文
  • 门户网站建设厂商名录无广告自助建站
  • 开源网站系统信阳哪里做网站
  • 四川省建设注册资格中心网站网上电影网站怎么做的
  • 现在做什么网站好温州网站建设制作
  • 如何查看网站的空间做一个简单的网站
  • 做网站第三方登录怎么弄做一个平台网站要多少钱
  • 怎么给网站做懒加载做网站泰州
  • 网站举报入口做网站如何组建域名
  • 淄博张店做网站的公司中材矿山建设有限公司网站
  • 怎么建设国外免费网站公司如何建站
  • 网站单向外链推广工具微网站如何做微信支付
  • 开发做一个网站的流程多用户商城网站开发
  • 网站首页包含的内容网站建设要学哪些软件有哪些方面
  • 大型网站建设培训课件网站关键词多少合适
  • 山东建设企业网站网站托管做的好的公司
  • html网页设计作业源代码福州网站设计十年乐云seo
  • jsp购物网站开发教程wordpress 文章 两边