从0到1的弹幕系统——实现超简单的websocket协议
haoteby 2024-12-27 13:32 6 浏览
前言
本系列为Golang实战,从0到1实现一个弹幕系统。在开发的过程中,会实现一些现成的轮子,只是为了学习用,实际使用时,还是使用现成的轮子。
现在直播行业大火,有斗鱼、虎牙等这种大型直播平台,也有带货直播、电商直播等等,这些直播都会有弹幕系统,没有弹幕的直播是没有灵魂的。不止是直播类,你在各大视频网站看视频时,不开弹幕吗?!!有时弹幕比视频内容更好看。
弹幕系统的使用场景非常广泛,其主要特点是实时性高、并发量大,尤其是并发量。在直播场景中,往往一个大主播,一场直播下来百万弹幕轻轻松松。由于golang在并发场景中的优越表现,我们选择使用golang来实现弹幕系统。
websocket协议实现
弹幕系统肯定绕不开websocket协议,使用弹幕的系统基本都会有H5应用,H5应用上的弹幕系统,大家想到的肯定是用websocket实现。
各大语言都有很多现成的websocket类库,例如nodeJS的socket.io,PHP的swoole等等,项目的一开始我们不使用这些现成的库,我们自己实现一个简单的websocket协议,在实现过程中学习websocket协议。
websocket协议在RFC6455中详细描述,有大神翻译的中文版了。
闲话不多说,下面开始动手。
websocket在建立握手时,是基于HTTP服务的,我们先启动一个HTTP服务:
func main() {
http.HandleFunc("/echo", func(writer http.ResponseWriter, request *http.Request) {
serveWs(writer, request)
})
err := http.ListenAndServe("localhost:9527", nil)
if err != nil {
log.Fatal(err)
}
}
复制代码
我们先用一个echo服务来测试websocket协议的实现,端口为9527,函数serveWs的功能很简单,建立socket连接,读取信息并回写,即一个echo服务。
握手连接
RFC6455文档第4节详细介绍了握手建立的过程。我们照着文档一步步实现。
The method of the request MUST be GET, and the HTTP version MUST be at least 1.1.
HTTP的方法必须是GET方法:
if request.Method != "GET" {
return nil, fmt.Errorf("HTTP必须是GET方法")
}
复制代码
The request MUST contain an Upgrade header field whose value MUST include the "websocket" keyword.
必须要有Upgradeheader,它的值必须是websocket:
if !httpHeaderContainsValue(request.Header, "Upgrade", "websocket") {
return nil, fmt.Errorf("必须包含一个Upgrade header字段,它的值必须为websocket")
}
复制代码
等等一系列的校验,这里就不一一赘述了。
HTTP header验证完后,我们再来处理TCP连接,众所周知,HTTP是在TCP之上的应用层协议,正常情况下,一个HTTP请求结束后,TCP也会断开。websocket连接其实就是一个TCP连接,所以我们不能让这个HTTP中的这个TCP连接断开,我们自己来管理这个TCP连接。怎么获取HTTP中的TCP连接呢?Golang为我们提供了一个Hijack方法。
hijacker, ok := writer.(http.Hijacker)
if !ok {
return nil, fmt.Errorf("未实现http.Hijacker")
}
netConn, buf, err := hijacker.Hijack()
if err != nil {
return nil, err
}
复制代码
接收到客户端的握手请求后,我们必须响应前端的请求,才能完成整个握手的过程。
If the status code received from the server is not 101, the client handles the response per HTTP [RFC2616] procedures. In particular, the client might perform authentication if it receives a 401 status code; the server might redirect the client using a 3xx status code (but clients are not required to follow them), etc. Otherwise, proceed as follows.
我们只是简单实现一个websocket协议,这里直接设置状态码为101:
var response []byte
response = append(response, "HTTP/1.1 101 Switching Protocols\r\n"...)
复制代码
If the response lacks an |Upgrade| header field or the |Upgrade| header field contains a value that is not an ASCII case- insensitive match for the value "websocket", the client MUST Fail the WebSocket Connection.
响应中必须有Upgradeheader信息,且值必须为websocket(不区分大小写):
response = append(response, "Upgrade: websocket\r\n"...)
复制代码
If the response lacks a |Connection| header field or the |Connection| header field doesn't contain a token that is an ASCII case-insensitive match for the value "Upgrade", the client MUST Fail the WebSocket Connection.
必须有Connection且值必须为Connection:
response = append(response, "Connection: Upgrade\r\n"...)
复制代码
If the response lacks a |Sec-WebSocket-Accept| header field or the |Sec-WebSocket-Accept| contains a value other than the base64-encoded SHA-1 of the concatenation of the |Sec-WebSocket- Key| (as a string, not base64-decoded) with the string "258EAFA5- E914-47DA-95CA-C5AB0DC85B11" but ignoring any leading and trailing whitespace, the client MUST Fail the WebSocket Connection.
必须有Sec-WebSocket-Acceptheader,值要根据Sec-WebSocket-Key的值和"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"串联起来,忽略所有前后空格进行base64 SHA-1编码得到:
var acceptKeyGUID = []byte("258EAFA5-E914-47DA-95CA-C5AB0DC85B11")
generateAcceptKey(request.Header.Get("Sec-WebSocket-Key"))
func generateAcceptKey(key string) string {
h := sha1.New()
h.Write([]byte(key))
h.Write(acceptKeyGUID)
return base64.StdEncoding.EncodeToString(h.Sum(nil))
}
response = append(response, "Sec-WebSocket-Accept: "...)
response = append(response, generateAcceptKey(request.Header.Get("Sec-WebSocket-Key"))...)
response = append(response, "\r\n\r\n"...)
复制代码
其他的非必选信息我们就不添加了。header构造好后,响应给客户端:
if _, err = netConn.Write(response); err != nil {
netConn.Close()
}
复制代码
因为现在这个TCP连接是我们自己管理,所以当握手过程中出现失败时,需要我们自己关闭这个连接。
现在,我们已经完成了整个握手的过程。测试一下:
握手成功,接下来我们开始详细的收发处理。
读消息
在WebSocket协议中,数据是通过一系列数据帧来进行传输的,RFC6455中第5节详情介绍了数据帧。
基础数据帧格式:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
复制代码
摘录一些重要的信息:
FIN: 1 bit
Indicates that this is the final fragment in a message. The first fragment MAY also be the final fragment. 复制代码
RSV1, RSV2, RSV3: 1 bit each
MUST be 0 unless an extension is negotiated that defines meanings for non-zero values. If a nonzero value is received and none of the negotiated extensions defines the meaning of such a nonzero value, the receiving endpoint MUST _Fail the WebSocket Connection_. 复制代码
Opcode: 4 bits
Defines the interpretation of the "Payload data". If an unknown opcode is received, the receiving endpoint MUST _Fail the WebSocket Connection_. The following values are defined. * %x0 denotes a continuation frame * %x1 denotes a text frame * %x2 denotes a binary frame * %x3-7 are reserved for further non-control frames * %x8 denotes a connection close * %x9 denotes a ping * %xA denotes a pong * %xB-F are reserved for further control frames 复制代码
Mask: 1 bit
Defines whether the "Payload data" is masked. If set to 1, a masking key is present in masking-key, and this is used to unmask the "Payload data" as per Section 5.3. All frames sent from client to server have this bit set to 1. 复制代码
Payload length: 7 bits, 7+16 bits, or 7+64 bits
The length of the "Payload data", in bytes: if 0-125, that is the payload length. If 126, the following 2 bytes interpreted as a 16-bit unsigned integer are the payload length. If 127, the following 8 bytes interpreted as a 64-bit unsigned integer (the most significant bit MUST be 0) are the payload length. Multibyte length quantities are expressed in network byte order. Note that in all cases, the minimal number of bytes MUST be used to encode the length, for example, the length of a 124-byte-long string can't be encoded as the sequence 126, 0, 124. The payload length is the length of the "Extension data" + the length of the "Application data". The length of the "Extension data" may be zero, in which case the payload length is the length of the "Application data". 复制代码
Masking-key: 0 or 4 bytes
All frames sent from the client to the server are masked by a 32-bit value that is contained within the frame. This field is present if the mask bit is set to 1 and is absent if the mask bit is set to 0. See Section 5.3 for further information on client- to-server masking. 复制代码
Payload data: (x+y) bytes
The "Payload data" is defined as "Extension data" concatenated with "Application data". 复制代码
在开始解析数据帧之前,我们来定义一些数据结构和方法:
type Conn struct {
conn net.Conn
br *bufio.Reader
writeBuf []byte // 写缓存
readLength int64 //数据长度
maskKey [4]byte // mask key
}
// 数据帧位
// RFC6455 5.2节
const (
finalBit = 1 << 7
rsv1Bit = 1 << 6
rsv2Bit = 1 << 5
rsv3Bit = 1 << 4
opCode = 0xf
maskBit = 1 << 7
pladloadLen = 0x7f
)
// 消息类型
// RFC6455 5.2节或11.8节
const (
ContinuationMessage = 0
TextMessage = 1
BinaryMessage = 2
CloseMessage = 8
PingMessage = 9
PongMessage = 10
)
func (c *Conn)read(n int) ([]byte, error) {
// 读取n个字节数据
p, err := c.br.Peek(n)
// 丢弃掉n个字节数据
c.br.Discard(len(p))
return p, err
}
func newConn(conn net.Conn, br *bufio.Reader) *Conn {
c := &Conn{
conn:conn,
br:br,
writeBuf:make([]byte, 128), // 写死,只接受128字节数据
}
return c
}
复制代码
读取数据帧头信息
在网络中,数据是以字节为单位进行传输的。在websocket中,重要的头信息都在前2个字节中,读取前2个字节:
p, err := c.read(2)
if err != nil {
return err
}
// 解析数据帧 RFC6455 5.2节
// 强制按0判断,不考虑是否有扩展信息
if rsv := p[0] & (rsv1Bit | rsv2Bit | rsv3Bit); rsv != 0 {
return fmt.Errorf("RSV必须为0")
}
// 表示这是消息的最后一个片段。第一个片段也有可能是最后一个片段。
// 暂时不考虑FIN位信息
// final := p[0]&finalBit != 0
frameType := int(p[0]&opCode)
// 判断FIN和opcode为是否匹配
// RFC6455 5.4节
// todo
switch frameType {
case ContinuationMessage:
case TextMessage, BinaryMessage:
case CloseMessage, PingMessage, PongMessage:
default:
return fmt.Errorf("未知的opcode")
}
复制代码
All frames sent from client to server have this bit set to 1.
客户端的数据帧中mask位必须为1:
mask := p[1]&maskBit != 0
if !mask {
return fmt.Errorf("mask位必须标记为1")
}
复制代码
再来获取应用数据的长度
Payload length: 7 bits, 7+16 bits, or 7+64 bits
c.readLength = int64(p[1]&pladloadLen)
复制代码
如果数据长度小于等于125,实际的值就是应用数据的长度。如果等于126,那么接下来的2个字节解释为一个16bit的无符号整形,作为应用数据的长度。如果等于127,那么接下来的8个字节解释为一个64bit的无符号整形,作为应用数据的长度。
// 获取数据长度
// https://tools.ietf.org/html/rfc6455#section-5.2
// The length of the "Payload data", in bytes: if 0-125, that is the
// payload length. If 126, the following 2 bytes interpreted as a
// 16-bit unsigned integer are the payload length. If 127, the
// following 8 bytes interpreted as a 64-bit unsigned integer (the
// most significant bit MUST be 0) are the payload length. Multibyte
// length quantities are expressed in network byte order.
c.readLength = int64(p[1]&pladloadLen)
switch c.readLength {
case 126:
p, err := c.read(2)
if err != nil {
return err
}
c.readLength = int64(binary.BigEndian.Uint16(p))
case 127:
p, err := c.read(8)
if err != nil {
return err
}
c.readLength = int64(binary.BigEndian.Uint64(p))
}
复制代码
获取mask-key掩码:
Masking-key: 0 or 4 bytes
p, err := c.read(4)
if err != nil {
return err
}
复制代码
我们只发送最简单的数据,其他数据帧信息都不存在,我们就不兼容处理了。
读取应用数据
读取应用数据就简单了:
// 读取长度加1,是为了简单处理,直接将EOF也读取出来
var p = make([]byte, c.readLength+1)
n, err := c.br.Read(p)
if err != nil {
return nil, err
}
复制代码
因为读取的数据是掩码之后的数据,所以我们需要解码。掩码和解码的算法5.3节已详细介绍过。
Octet i of the transformed data ("transformed-octet-i") is the XOR of octet i of the original data ("original-octet-i") with octet at index i modulo 4 of the masking key ("masking-key-octet-j"):
j = i MOD 4 transformed-octet-i = original-octet-i XOR masking-key-octet-j 复制代码
// 只支持英文和数字
func maskBytes(key [4]byte, pos int, b []byte) int {
for i := range b {
b[i] ^= key[pos%4]
pos++
}
return pos%4
}
maskBytes(c.maskKey, 0, p[:n])
复制代码
测试一下:
$ go run *.go
rsv msg WebSocket rocks
复制代码
数据读取成功。下面开始写信息。
写应用数据
继续简单处理,只支持写125字节的数据。因为超过125字节的数据,Payload len位就要做特殊处理了。这样的话,主要的头信息就在前2个字节中了。
强制文本信息,并且RSV都设置0,FIN直接为1:
// 第一个字节(前8位)
// 默认为文本信息
b0 := byte(TextMessage)
// FIN 1
b0 |= finalBit
复制代码
然后设置MASK位和应用数据长度:
b1 := byte(0)
b1 |= byte(len(msg))
复制代码
因为这里已经限制只能写不超过125字节的数据了,所以可以直接设置Payload len位。 写入缓存:
c.writeBuf[0] = b0
c.writeBuf[1] = b1
复制代码
头信息设置完了,可以写应用数据了:
func (c *Conn) WriteMessage(msg []byte) error {
...
copy(c.writeBuf[2:], msg)
_, err := c.conn.Write(c.writeBuf)
return err
}
复制代码
再来测试一下:
至此,我们一个超简单、只能学习用的websocket协议实现了。
websocket的扩展信息和二进制类型的信息我们都没处理,ping、pong等都没处理,而且数据分片、粘包处理等等我们都没处理。所以说,这只是学习用。
项目的代码地址gitee.com/ask/danmaku
作者:W_懒虫
链接:https://juejin.im/post/6865219393895268365
来源:掘金
相关推荐
- 单点登录(SSO)解决方案介绍(单点登录概念)
-
一、单点登录的介绍单点登录(SingleSignOn),简称为SSO,是目前比较流行的企业业务整合的解决方案之一。SSO的定义是在多个应用系统中,用户只需要登录一次就可以访问所有相互信任的应用系...
- 系统登录的三种方式,哪一种更安全?
-
登录是一个高频的动作,笔者抓住这一个小点,分析了系统登录的几种方式和对应的场景。今天谈谈登录。登录即用户输入用户名和密码登录进系统中。B端系统,对于登录的业务场景有两种(可能不止,目前遇到过这两种):...
- 到底什么是单点登录(SSO)?(什么叫做单点登录)
-
什么是单点登录?单点登录(SingleSign-On,简称SSO)是一种集中式的身份验证和授权机制,用户只需在一处输入一次凭证(例如用户名和密码)就可以访问多个相关但独立的软件系统。在数字化时代,...
- 5年稳如老狗的单点登录系统,到底是怎么搞出来的?
-
说到单点登录(SingleSign-On,简称SSO),大家的第一反应可能是——啊不就是登录一次,能到处串门儿嘛?别说,还真差不多,就是这么个意思。但真要搭一套好用、耐造、还能扛住公司里各种奇奇怪...
- 这些负载均衡都解决哪些问题?服务、网关、NGINX?
-
在微服务项目中,有服务的负载均衡、网关的负载均衡、Nginx的负载均衡,这几个负载均衡分别用来解决什么问题呢?一、服务的负载均衡先抛出一个问题:...
- Nginx负载均衡最全详解(4大算法原理机制)
-
Nginx在大型网站架构很重要,也是大厂重点考察方向,今天我就重点来详解Nginx负载均衡@mikechen本篇已收于mikechen原创超30万字《阿里架构师进阶专题合集》里面。Nginx负载均衡N...
- 负载均衡 Nginx Session 一致性(nginx 负载均衡 会话保持)
-
HTTPS请求跳转...
- 监控Oracle Cloud负载均衡器:Applications Manager释放最佳性能
-
设想你正在运营一个受欢迎的在线学习平台,在考试前的高峰期,平台流量激增。全球的学生同时登录,观看视频、提交作业和参加测试。如果OracleCloud负载均衡器不能高效地分配流量,或者后端服务器难...
- Nginx负载均衡:nginx.conf配置文件说明!
-
大家好,欢迎来到程序视点!我是你们的老朋友.小二!在此记录下Nginx服务器nginx.conf负载均衡的配置文件说明,部分注释收集与网络.关于nginx.conf基本的配置,请查看上一篇文章!Ng...
- Java高可用系统架构中的负载均衡策略
-
Java高可用系统架构中的负载均衡策略在现代的分布式系统中,负载均衡策略是构建高可用系统的基石。Java开发者需要深刻理解这些策略,以便打造稳定且高效的系统。接下来,让我们一起揭开负载均衡的神秘面纱。...
- Spring Boot3 客户端负载均衡全解析:从原理到实战
-
在当今互联网大厂后端技术开发的激烈竞争环境中,构建高效、稳定的微服务架构是核心诉求。其中,SpringBoot3作为热门开发框架,其客户端负载均衡功能对于提升系统性能、保障服务稳定性起着关键作用。...
- MySql高可用集群MySQL Router负载均衡读写分离
-
名词解释MGR:MysqlGroupReplication组复制,多台MySQL服务器在同一组中会自动保持同步状态,当某台服务器故障时,整个复制组依然可以保持正常并对外提供服务。...
- 性能测试之tomcat+nginx负载均衡(nginx tomcat)
-
nginxtomcat配置准备工作:两个tomcat执行命令cp-rapache-tomcat-8.5.56apache-tomcat-8.5.56_2修改被复制的tomcat2下con...
- win10/11双网卡链路聚合叠加负载均衡提升网速解决网卡网速瓶颈!
-
双网卡链路聚合一种网络配置技术,通过将多个物理网卡绑定在一起,形成一个逻辑上的网络接口,以提高网络的可靠性、可用性和性能。这种技术通常用于服务器和网络设备中,以实现负载均衡、冗余和高可用性。本机环境:...