聊聊基于tcp的应用层消息边界如何定义

背景

2018年笔者有幸接触一个项目要用到长连接实现云端到设备端消息推送,所以借机了解过相关的内容,最终是通过rabbitmq+mqtt实现了相关功能,同时在心里也打了一个问号“如果自己实现长连接框架,该怎么定义消息的边界呢?”,之后断断续续整理了一些,一直不成体系,最近放假了整理出来跟大家交流一番。

为什么需要消息边界

消息边界并非长连接场景才需要,即使是短连接也可能需要,拿我们比较常用的http1.0协议(http1.1稍微复杂一些,后面会单独说)来说,它基于tcp这个传输协议来传递消息,而tcp协议又是一个面向流的协议,怎么能识别出已经到了流的末尾呢?我们需要一种规则来定义消息的边界,告诉对方读取已经到了末尾,可以结束了。

举一个生活中的例子来帮助理解,2020年由于疫情的原因,平日里都是在线下会议室开会,特殊时期演变成了线上会议。不知道大家有没有遇到过这种情况,线下开会时通过观察别人的动作、神情很容易知道他说完了,这时候下一个人就可以接着发言了,但是线上开会时这样就行不通了,你如果想发言是不是得先确认下别人有没有说完,如果直接发言可能会打断别人,这样很不礼貌,为什么会出现这种情况呢?因为你不知道他到底有没有结束发言,更专业一点说你不知道是否到达了消息的边界。那怎么改进呢,如果每个人发言完毕都显示的告诉别人“我说完了”,是不是会好一些呢,“我说完了”这四个字就是一种消息的边界,给接收方传达一种消息结束的讯息。

TCP层面的分析

本节来源于https://netty.io/wiki/user-guide-for-4.x.html#wiki-h3-10

在基于流的传输(例如TCP / IP)中,将接收到的数据存储到套接字接收缓冲区中。不幸的是,基于流的传输的缓冲区不是数据包队列而是字节队列。这意味着,即使您将两个消息作为两个独立的数据包发送,操作系统也不会将它们视为两个消息,而只是一堆字节。因此,不能保证读取的内容与远端写的完全一样。例如,假设操作系统的TCP / IP栈已收到三个数据包:

由于是基于流的协议,因此很有可能在应用程序中读到以下四个分段:

因此,无论是服务器端还是客户端,接收方都应将接收到的数据整理到一个或多个有意义的帧中,以使应用程序逻辑易于理解。在上面的示例中,正确的数据应采用以下格式:

消息边界的种类

前面介绍了消息边界的定义以及作用,这一节我们来看看大概会有哪几种消息边界。

1.特殊字符:比如上面提到的“我说完了”这就是一种特殊字符作为消息边界的例子,以特殊字符为边界的典型产品有我们熟知的redis,客户端和服务器发送的命令或数据一律以 \r\n (CRLF)结尾,还有Netty中的DelimiterBasedFrameDecoder。

2.基于消息长度:比如约定了消息长度为4k字节,接收方每次读取4k字节以后就认为已到达消息边界,结束本次读取。当然现实中消息长度一般是变长的,这样就需要设计一个约定好的消息头部,将消息长度作为头部的一部分传输过去,以长度为边界的例子有Dubbo、http

、websocket,Netty中的FixedLengthFrameDecoder、LengthFieldBasedFrameDecoder等。

                  附上一张dubbo协议头,供大家体会

redis如何解析完整消息

上面说过,redis是通过\r\n来作为消息边界的,下面我将从源码角度分析下redis具体是如何处理的。
1.这里通过telnet来发送内联格式命令请求redis,之所以没有选用redis-cli是想模拟一条指令redis-server分多次收到的情况,在telnet模式下,每输入一个字符,就会发送给redis-server端,而redis-cli不是,它是按下回车时才会发送整体输入的命令,redis-server端是分多次还是一次收到完整的命令,这个取决于底层,如果想模拟分多次收到,这个过程较为复杂。

2.redis-server端每次有输入时会触发readQueryFromClient(networking.c)函数,对redis执行流程感兴趣的可以参考我之前的文章“redis源码学习之工作流程初探”。

3.redis-server将收到的内容暂存到redisClient的querybuf中,如果没有收到\r\n就等待,直到收到\r\n才将querybuf中的内容解析成指令执行。

测试步骤如下:

  • telnet 中输入g

  • debug查看redisClient中querybuf的值,目前只有g

  • telnet中输完get a按回车以后,redisClient中querybuf保存了所有的输入get a \r\n

源码分析如下:

readQueryFromClient

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
int nread, readlen;
size_t qblen;
REDIS_NOTUSED(el);
REDIS_NOTUSED(mask); server.current_client = c;
readlen = REDIS_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
if (c->reqtype == REDIS_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= REDIS_MBULK_BIG_ARG)
{
int remaining = (int)((unsigned)(c->bulklen+2)-sdslen(c->querybuf)); if (remaining < readlen) readlen = remaining;
} qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); //从fd中读取内容,读取的内容存到redisClient的querybuf中
nread = read(fd, c->querybuf+qblen, readlen);
if (nread == -1) {
if (errno == EAGAIN) {
nread = 0;
} else {
#ifdef _WIN32
redisLog(REDIS_VERBOSE, "Reading from client: %s",wsa_strerror(errno));
#else
redisLog(REDIS_VERBOSE, "Reading from client: %s",strerror(errno));
#endif
freeClient(c);
return;
}
} else if (nread == 0) {
redisLog(REDIS_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
#ifdef WIN32_IOCP
aeWinReceiveDone(fd);
#endif
if (nread) {
sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & REDIS_MASTER) c->reploff += nread;
} else {
server.current_client = NULL;
return;
}
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = getClientInfoString(c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64);
redisLog(REDIS_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClient(c);
return;
} //正常的读取,继续执行processInputBuffer
processInputBuffer(c);
server.current_client = NULL;
}

processInputBuffer

void processInputBuffer(redisClient *c) {
/* Keep processing while there is something in the input buffer */
while(sdslen(c->querybuf)) {
/* Immediately abort if the client is in the middle of something. */
if (c->flags & REDIS_BLOCKED) return; /* REDIS_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands). */
if (c->flags & REDIS_CLOSE_AFTER_REPLY) return; /* Determine request type when unknown. */
//判断协议类型,如果是*开头的就是redis的统一请求协议,否则就是内联协议
if (!c->reqtype) {
if (c->querybuf[0] == '*') {
c->reqtype = REDIS_REQ_MULTIBULK;
} else {
c->reqtype = REDIS_REQ_INLINE;
}
} //走内联协议的处理函数processInlineBuffer
if (c->reqtype == REDIS_REQ_INLINE) {
//如果命令不完整或者解析失败,不会执行命令
if (processInlineBuffer(c) != REDIS_OK) break;
} else if (c->reqtype == REDIS_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != REDIS_OK) break;
} else {
redisPanic("Unknown request type");
} /* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* Only reset the client when the command was executed. */
//命令解析完成,执行具体的命令对应的函数
if (processCommand(c) == REDIS_OK)
resetClient(c);
}
}
}

processInlineBuffer

int processInlineBuffer(redisClient *c) {
char *newline;
int argc, j;
sds *argv, aux;
size_t querylen; /* Search for end of line */
newline = strchr(c->querybuf,'\n'); /* Nothing to do without a \r\n */
//最后一个字符不是\n,返回REDIS_ERR,说明命令不完整,继续等待
if (newline == NULL) {
if (sdslen(c->querybuf) > REDIS_INLINE_MAX_SIZE) {
addReplyError(c,"Protocol error: too big inline request");
setProtocolError(c,0);
}
return REDIS_ERR;
} /* Handle the \r\n case. */
//继续判断是否是以\r\n结尾的,如果是就截取\r\n前面的内容为参数
if (newline && newline != c->querybuf && *(newline-1) == '\r')
newline--; /* Split the input buffer up to the \r\n */
querylen = newline-(c->querybuf);
aux = sdsnewlen(c->querybuf,querylen);
argv = sdssplitargs(aux,&argc);
sdsfree(aux);
if (argv == NULL) {
addReplyError(c,"Protocol error: unbalanced quotes in request");
setProtocolError(c,0);
return REDIS_ERR;
} /* Newline from slaves can be used to refresh the last ACK time.
* This is useful for a slave to ping back while loading a big
* RDB file. */
if (querylen == 0 && c->flags & REDIS_SLAVE)
c->repl_ack_time = server.unixtime; /* Leave data after the first line of the query in the buffer */
sdsrange(c->querybuf,querylen+2,-1); /* Setup argv array on client structure */
if (c->argv) zfree(c->argv);
c->argv = zmalloc(sizeof(robj*)*argc); /* Create redis objects for all arguments. */
for (c->argc = 0, j = 0; j < argc; j++) {
if (sdslen(argv[j])) {
c->argv[c->argc] = createObject(REDIS_STRING,argv[j]);
c->argc++;
} else {
sdsfree(argv[j]);
}
}
zfree(argv);
return REDIS_OK;
}

Netty FixedLengthFrameDecoder、LengthFieldBasedFrameDecoder如何解析完整消息

有兴趣的小伙伴可以看看FixedLengthFrameDecoder、LengthFieldBasedFrameDecoder源码的java doc说明,里面讲的比较详细,在此不再重复。

总结

网络上其他作者将这类问题称之为TCP“粘包”和“拆包”,与本文提到的消息边界本质上没有太多区别,之所以没有继续叫“拆包”是不想把概念复杂化,回到本质其实就是需要一种机制来定义消息的边界,帮助应用层来正确的解析消息。

通过redis源码的简单分析,大体可以得到解决这类问题的关键点有以下两步:
1.需要一种边界的定义,基于特殊字符、基于长度等;

2.消息接收端需要暂存收到的内容,不到边界时等待,直到符合边界条件(收到了特殊字符或者收到的字节数达到约定的长度)。

虽说不是一个高大上的知识点,但是通过查资料和阅读源码也解决了心中的困惑,过程中通过发散式的学习也了解到Netty框架针对这类问题的解决方案,算是对Netty的认识又深入了一点。

基于tcp的应用层消息边界如何定义的更多相关文章

  1. 构建基于TCP的应用层通信模型

    各层的关系如下图,表述的是两个应用或CS间通信的过程:   通常使用TCP构建应用时,需要考虑传输层的通信协议,以便应用层能够正确识别消息请求.比如,一个请求的内容很长(如传文件),那肯定要分多次发送 ...

  2. TCP和UDP的&quot;保护消息边界&quot; (经典)

    在socket网络程序中,TCP和UDP分别是面向连接和非面向连接的.因此TCP的socket编程,收发两端(客户端和服务器端)都要有一一成对的socket,因此,发送端为了将多个发往接收端的包,更有 ...

  3. 基于TCP/IP协议的C++网络编程(API函数版)

    源代码:http://download.csdn.net/detail/nuptboyzhb/4169959 基于TCP/IP协议的网络编程 定义变量——获得WINSOCK版本——加载WINSOCK库 ...

  4. 介绍开源的.net通信框架NetworkComms框架之四 消息边界

    原文网址: http://www.cnblogs.com/csdev Networkcomms 是一款C# 语言编写的TCP/UDP通信框架  作者是英国人  以前是收费的 目前作者已经开源  许可是 ...

  5. UDP TCP 消息边界

    先明确一个问题,如果定义了一个数据结构,大小是,比方说 32 个字节,然后 UDP 客户端连续向服务端发了两个包.现在假设这两个包都已经到达了服务器,那么服务端调用 recvfrom 来接收数据,并且 ...

  6. Mina、Netty、Twisted一起学(二):TCP消息边界问题及按行分割消息

    在TCP连接开始到结束连接,之间可能会多次传输数据,也就是服务器和客户端之间可能会在连接过程中互相传输多条消息.理想状况是一方每发送一条消息,另一方就立即接收到一条,也就是一次write对应一次rea ...

  7. TCP和UDP的保护消息边界机制

    在socket网络程序中,TCP和UDP分别是面向连接和非面向连接的.TCP的socket编程,收发两端都要有一一成对的socket,因此,发送端为了将多个发往接收端的包,更有效的发到对方,使用了优化 ...

  8. TCP和UDP的&quot;保护消息边界”

    转自:http://blog.csdn.net/zhangxinrun/article/details/6721427 在socket网络程序中,TCP和UDP分别是面向连接和非面向连接的.因此TCP ...

  9. Fixed-Length Frames 谈谈网络编程中应用层(基于TCP/UDP)的协议设计

    http://blog.sina.com.cn/s/blog_48d4cf2d0101859x.html 谈谈网络编程中应用层(基于TCP/UDP)的协议设计 (2013-04-27 19:11:00 ...

  10. 【转】TCP协议的无消息边界问题

    http://www.cnblogs.com/eping/archive/2009/12/12/1622579.html   使用TCP协议编写应用程序时,需要考虑一个问题:TCP协议是无消息边界的, ...

随机推荐

  1. Denormalization

    Denormalization In computing, denormalization is the process of attempting to optimize the read perf ...

  2. mysql lower,upper实现大小写

    mysql的lower和uppper函数可以将指定字符串转换为小写和大写 select lower('OutSpringTd') as lowerCase, upper('OutSpringTd') ...

  3. JVM的stack和heap,JVM内存模型,垃圾回收策略,分代收集,增量收集

    (转自:http://my.oschina.net/u/436879/blog/85478) 在JVM中,内存分为两个部分,Stack(栈)和Heap(堆),这里,我们从JVM的内存管理原理的角度来认 ...

  4. NoSql数据库使用半年后在设计上面的一些心得 (转)

    http://www.cnblogs.com/AllenDang/p/3507821.html NoSql数据库这个概念听闻许久了,也陆续看到很多公司和产品都在使用,优缺点似乎都被分析的清清楚楚.但我 ...

  5. SweetAlert 使用

    $(".delete").click(function(){ var work_name = $(this).data('name'); var item_id = $(this) ...

  6. BSTR、char*和CString转换

    (1) char*转换成CString 若将char*转换成CString,除了直接赋值外,还可使用CString::Format进行.例如: char chArray[] = "This  ...

  7. linux学习: sudo命令(ubuntu)

    使用 sudo 命令可以提高命令的执行权限,以root权限执行 如 :  sudo vi xxx 但是有些内置命令 如 cd 无法通过 sudo来执行 ,如  sudo cd xxx 这是会报错的. ...

  8. BigInteger详解

    在Java中有两个类BigInteger和BigDecimal分别表示大整数类和大浮点数类,理论上能够表示无线大的数,只要计算机内存足够大. 这两个类都在 java.math.* 包中,因此每次必须在 ...

  9. Ngx_Lua使用分享

    2017年04月22日 20:05:21 阅读数:430 Nginx_Lua 1.1. 介绍 1.2. 安装 1.2.1. 安装JIT平台 1.2.2. NDK与Lua_module 1.2.3. 编 ...

  10. Spark Streaming 例子

    NetworkWordCount.scala /* * Licensed to the Apache Software Foundation (ASF) under one or more * con ...