当前位置: 首页 > 编程日记 > 正文

Golang 搭建 WebSocket 应用(八) - 完整代码

本文应该是本系列文章最后一篇了,前面留下的一些坑可能后面会再补充一下,但不在本系列文章中了。

整体架构

再来回顾一下我们的整体架构:

在这里插入图片描述

在我们的 demo 中,包含了以下几种角色:

  1. 客户端:一般是浏览器,用于接收消息;
  2. Hub:消息中心,用于管理所有的客户端连接,以及将消息推送给客户端;
  3. 调用 /send 发送消息的应用:用于将消息发送给 Hub,然后由 Hub 将消息推送给客户端。

然后,每一个 WebSocket 连接都有一个关联的读协程和写协程,
用于读取客户端发送的消息,以及将消息推送给客户端。

目录结构

├── LICENSE  // 协议
├── Makefile // 一些常用的命令
├── README.md
├── authenticator.go      // 认证器
├── authenticator_test.go // 认证器测试
├── bytes.go // 字符串和 []byte 之间转换的辅助方法
├── client.go // WebSocket 客户端
├── go.mod    // 项目依赖
├── go.sum    // 项目依赖
├── hub.go    // 消息中心
├── main.go   // 程序入口
├── message   // 消息记录器
│   ├── db_logger.go
│   ├── db_logger_test.go
│   ├── log.go
│   └── stdout_logger.go
├── server.go // HTTP 服务
└── server_test.go // HTTP 接口的测试

运行

注:需要 Go 1.20 或以上版本

  1. 下载依赖:

可以使用七牛云的代理加速下载。

go mod tidy
  1. 启动 WebSocket 服务端:
go run main.go

Hub 代码

最终,我们的 Hub 代码演进成了下面这样:

// bufferSize 通道缓冲区、map 初始化大小
const bufferSize = 128

// Handler 错误处理函数
type Handler func(log message.Log, err error)

// Hub 维护了所有的客户端连接
type Hub struct {
	// 注册请求
	register chan *Client
	// 取消注册请求
	unregister chan *Client
	// 记录 uid 跟 client 的对应关系
	userClients map[string]*Client
	// 互斥锁,保护 userClients 以及 clients 的读写
	sync.RWMutex
	// 消息记录器
	messageLogger message.Logger
	// 错误处理器
	errorHandler Handler
	// 验证器
	authenticator Authenticator
	// 等待发送的消息数量
	pending atomic.Int64
}

// 默认的错误处理器
func defaultErrorHandler(log message.Log, err error) {
	res, _ := json.Marshal(log)
	fmt.Printf("send message: %s, error: %s\n", string(res), err.Error())
}

func newHub() *Hub {
	return &Hub{
		register:      make(chan *Client),
		unregister:    make(chan *Client),
		userClients:   make(map[string]*Client, bufferSize),
		RWMutex:       sync.RWMutex{},
		messageLogger: &message.StdoutMessageLogger{},
		errorHandler:  defaultErrorHandler,
		authenticator: &JWTAuthenticator{},
	}
}

// 注册、取消注册请求处理
func (h *Hub) run() {
	for {
		select {
		case client := <-h.register:
			h.Lock()
			h.userClients[client.uid] = client
			h.Unlock()
		case client := <-h.unregister:
			h.Lock()
			close(client.send)
			delete(h.userClients, client.uid)
			h.Unlock()
		}
	}
}

// 返回 Hub 的当前的关键指标
func metrics(hub *Hub, w http.ResponseWriter) {
	pending := hub.pending.Load()
	connections := len(hub.userClients)
	_, _ = w.Write([]byte(fmt.Sprintf("# HELP connections 连接数\n# TYPE connections gauge\nconnections %d\n", connections)))
	_, _ = w.Write([]byte(fmt.Sprintf("# HELP pending 等待发送的消息数量\n# TYPE pending gauge\npending %d\n", pending)))
}

其中:

  • Hub 中的 registerunregister 通道用于处理客户端的注册和取消注册请求;
  • Hub 中的 userClients 用于记录 uidClient 的对应关系;
  • Hub 中的 messageLogger 用于记录消息;
  • Hub 中的 errorHandler 用于处理错误;
  • Hub 中的 authenticator 用于验证客户端的身份;
  • Hub 中的 pending 用于记录等待发送的消息数量。

目前实现存在的问题:

  • registerunregister 通道被消费的时候需要加锁,这样会导致 registerunregister 变成串行的,性能不好;
  • userClients 也是需要加锁的,这样会导致 userClients 的读写也是串行的,性能不好;

对于这两个问题,前面我们讨论过,一种可行的办法分段 map,然后对每一个 map 都有一个对应的 sync.Mutex 互斥锁来保证其读写的安全。

Client 代码

Client 比较关键的方法是:

  • writePump:负责将消息推送给客户端。
  • serveWs:处理 WebSocket 连接请求。
  • send:处理消息发送请求。

writePump

这个方法会从 send 通道中获取消息,然后推送给客户端。
推送失败会调用 errorHandler 处理错误。
推送成功会将 pending 减一。

// writePump 负责推送消息给 WebSocket 客户端
//
// 该方法在一个独立的协程中运行,我们保证了每个连接只有一个 writer。
// Client 会从 send 请求中获取消息,然后在这个方法中推送给客户端。
func (c *Client) writePump() {
	defer func() {
		_ = c.conn.Close()
	}()

	// 从 send 通道中获取消息,然后推送给客户端
	for {
		messageLog, ok := <-c.send

		// 设置写超时时间
		_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
		// c.send 这个通道已经被关闭了
		if !ok {
			c.hub.pending.Add(int64(-1 * len(c.send)))
			return
		}

		if err := c.conn.WriteMessage(websocket.TextMessage, StringToBytes(messageLog.Message)); err != nil {
			c.hub.errorHandler(messageLog, err)
			c.hub.pending.Add(int64(-1 * len(c.send)))
			return
		}

		c.hub.pending.Add(int64(-1))
	}
}

serveWs

serveWs 方法会处理 WebSocket 连接请求,然后将其注册到 Hub 中。
在连接的时候会对客户端进行认证,认证失败会断开连接。
最后会启动读写协程。

// serveWs 处理 WebSocket 连接请求
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
	// 升级为 WebSocket 连接
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		w.WriteHeader(http.StatusBadRequest)
		_, _ = w.Write([]byte(fmt.Sprintf("upgrade error: %s", err.Error())))
		return
	}

	// 认证失败的时候,返回错误信息,并断开连接
	uid, err := hub.authenticator.Authenticate(r)
	if err != nil {
		_ = conn.SetWriteDeadline(time.Now().Add(time.Second))
		_ = conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("authenticate error: %s", err.Error())))
		_ = conn.Close()
		return
	}

	// 注册 Client
	client := &Client{hub: hub, conn: conn, send: make(chan message.Log, bufferSize), uid: uid}
	client.conn.SetCloseHandler(closeHandler)
	// register 无缓冲,下面这一行会阻塞,直到 hub.run 中的 <-h.register 语句执行
	// 这样可以保证 register 成功之后才会启动读写协程
	client.hub.register <- client

	// 启动读写协程
	go client.writePump()
	go client.readPump()
}

send

send 是一个 http 接口,用于处理消息发送请求。
它会从 Hub 中获取 uid 对应的 Client,然后将消息发送给客户端。

// send 处理消息发送请求
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
	uid := r.FormValue("uid")
	if uid == "" {
		w.WriteHeader(http.StatusBadRequest)
		_, _ = w.Write([]byte("uid is required"))
		return
	}

	// 从 hub 中获取 uid 关联的 client
	hub.RLock()
	client, ok := hub.userClients[uid]
	hub.RUnlock()
	if !ok {
		w.WriteHeader(http.StatusBadRequest)
		_, _ = w.Write([]byte(fmt.Sprintf("client not found: %s", uid)))
		return
	}

	// 记录消息
	messageLog := message.Log{Uid: uid, Message: r.FormValue("message")}
	_ = hub.messageLogger.Log(messageLog)

	// 发送消息
	client.send <- messageLog

	// 增加等待发送的消息数量
	hub.pending.Add(int64(1))
}

github

完整代码可以在 github 上进行查看:https://github.com/eleven26/go-pusher

相关文章:

一个合格的Java选手必须要掌握的并发锁知识

Java内置锁:基于Java语法层面(关键词)实现的锁,主要是根据Java语义来实现,最典型的应用就是synchronized。Java显式锁:基于JDK层面实现的锁,主要是根据基于Lock接口和ReadWriteLock接口,以及统一的AQS基础同步器等来实现,最典型的有ReentrantLock。使用方式:synchronized关键字互斥锁主要有作用于对象方法上面,作用于类静态方法上面,作用于对象方法里面,作用于类静态方法里面等4种方式。

Integer.toHexString(b & 0xff)理解以及& 0xff什么意思

首先toHexString传的参数应该是int类型32位,此处传的是byte类型8位,所以前面需要补24个0。然后& 0xff 就是把前面24个0去掉只要后8位。toHexString(b & 0xff)相当于做了一次位的与运算,将前24位字符省略,将后8位保留。是两个十六进制的数,每个f用二进制表示是1111,所以占四位(bit),两个f()占八位(bit),八位(bit)也就是一个字节(byte).这个方法是把字节(转换成了int)以16进制的方式显示。我的理解是这样,如有不对欢迎指正!

使用JavaScript实现复杂功能:一个完整的电商网站搜索功能

随着互联网的发展,电子商务网站已经成为人们购物的重要平台。而在这些网站中,搜索功能无疑是核心功能之一。用户可以通过搜索快速找到他们需要的商品,从而提高购物体验。本文将详细介绍如何使用JavaScript实现一个完整的电商网站搜索功能。

C++并发编程:互斥锁std::mutex和lock_guard的使用

对象离开其作用域时,会自动调用析构函数,该析构函数会释放锁。这确保了在任何情况下(包括由于异常等原因导致的提前退出),锁都会被正确释放,从而避免了忘记手动释放锁而导致的死锁问题。mutex 用于控制多个线程访问共享资源,确保在任意时刻只有一个线程可以访问该资源,避免数据竞争。这确保了同一时刻只有一个线程可以访问被保护的资源,从而防止多线程并发访问导致的数据不一致性。是 C++ 标准库中提供的一个模板类,用于在其构造时自动获取锁,在析构时自动释放锁。是 C++ 标准库中提供的一种用于多线程同步的互斥锁实现。

上位机图像处理和嵌入式模块部署(qt插件的使用)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing @163.com】 一个软件一般有很多的功能,但是主流程只有一个。但在软件开发的过程当中,一般来说功能是需要不断添加的,但是主流程最好不要轻易修改。这里的插件就相当于各种各样的功能,而主流程就是如何怎么去调用这些插件的功能。所以,今天正好来学一下怎么添加qt插件,个人觉得这部分还是非常重要的。

C程序的内存空间布局(栈、堆、数据区、常量区、代码区)

较详细的介绍了栈、堆、数据区、常量区、代码区

Java中的四种访问权限(private,public,protected,无修饰)

/实体类属性和数据库字段名称不一致//实体类属性和数据库字段名称不一致return id;return age;emp.test();//直接调用public修饰的变量//private修饰的变量进行赋值//调用private修饰的变量1、public修饰符定义的属性和方法通过对象实例化进行调用,2、private修饰的属性通过set、get方法进行调用。

websocket服务端本地部署

即登录cpolar官网后,点击预留,保留一个固定tcp端口地址,然后将其配置到相应的隧道中即可。这里我们用cpolar内网穿透来映射内网端口,它支持http/https/tcp协议,不限制流量,无需公网ip,也不用设置路由器,操作简单。注意:该隧道选择的是临时tcp地址和端口,24小时内会变化,如需固定tcp地址,可升级为专业套餐做tcp地址固定!cpolar安装成功后,默认会配置两个默认隧道:一个ssh隧道和一个website隧道,可自行删减或者修改。,可以查看到token码,复制并执行命令进行认证。

Java中的方法重载和方法重写有什么区别?

Java中的方法重载(Overloading)和方法重写(Overriding)都是面向对象编程中的重要概念,但它们之间有一些区别。方法重载是指在同一个类中,可以定义多个具有相同名称但参数列表不同的方法。这些方法具有不同的参数类型、参数个数或参数顺序。在调用重载方法时,Java编译器会根据传递给方法的参数类型和数量来选择要调用的正确方法。方法重载主要用于解决方法的命名冲突和提高代码的可读性和可维护性。

python基础使用之变量,表达式,语句

PYTHON基础知识系列之变量、表达式、语句

C语言常见面试题:什么是宏,宏的作用是什么?

宏在计算机科学中是一种批量处理程序命令,它是一种抽象的规则或模式,用于说明某一特定输入(通常是字符串)如何根据预定义的规则转换成对应的输出(通常也是字符串)。在编译时,预处理器会对宏进行展开,即将宏的内容替换到宏所在的位置。以上是宏的一些主要作用,但并不是全部。在实际编程中,根据需要选择是否使用宏以及如何使用宏,以实现更好的代码组织和可读性。,这样就可以计算出a和b的和。这个例子展示了宏的基本用法和作用。在这个例子中,我们定义了一个宏。,用于计算两个数的和。时,预处理器会将其展开为。

python基础小知识:引用和赋值的区别

通过引用,就可以在程序范围内任何地方传递大型对象而不必在途中进行开销巨大的赋值操作。不过需要注意的是,这种赋值仅能做到顶层赋值,如果出现嵌套的情况下仍不能进行深层赋值。赋值与引用不同,复制后会产生一个新的对象,原对象修改后不会影响到新的对象。如果在原位置修改这个可变对象时,可能会影响程序其他位置对这个对象的引用

Python自动化实战之接口请求的实现

作为一位过来人也是希望大家少走一些弯路,如果你不想再体验一次学习时找不到资料,没人解答问题,坚持几天便放弃的感受的话,在这里我给大家分享一些自动化测试的学习资源,希望能给你前进的路上带来帮助。

C#winform上位机开发学习笔记3-串口助手的信息保存功能添加

上位机开发的系列学习笔记,避免遗忘多记录多补充多优化

前端JS代码中Object类型数据的相关知识

遍历JavaScript中的对象有几种方法,包括使用for…in循环、Object.keys()方法、Object.values()方法和Object.entries()方法。因此前端传入了日期类型数据之后,如果和后台数据库中的数据类型不一致,比如数据库中的日期数据类型格式是。前端传入的Object对象中其中某个字段值是日期类型的数据,则在前端的类型就是一个。,则数据传往后端之前需要做格式类型转换。,它的值是一个中国标准时间,比如。

Rust之旅 - Rust概念、Windows安装、环境配置

本章节介绍Rust概念、Windows安装、环境配置以及最初级的语法。至此,我们就成功的构建了一个Rust程序,并成功在Visual Studio Code里运行了这个程序,万事俱备,我们就可以开始Rust之旅了。资料获取,更多粉丝福利,关注下方公众号获取。

C语言中常用的字符串处理函数和内存操作函数

`memmove(void *destination, const void *source, size_t num)`:将`source`指向的内存块的前`num`个字节移动到`destination`所指向的内存块,即使内存块有重叠部分。返回指向`destination`的指针。- `memcpy(void *destination, const void *source, size_t num)`:将`source`指向的内存块的前`num`个字节复制到`destination`所指向的内存块。

一键式Excel分词统计工具:如何轻松打包Python脚本为EXE

最近,表姐遇到了一个挑战:需要从Excel文件中统计出经过分词处理的重复字段,但由于数据隐私问题,这些Excel文件不能外传。这种情况下,直接使用Excel内置功能好像是行不通的,需要借助Python脚本来实现。为了解决这个问题,我写了一个简单的数据分析和自动化办公脚本,以方便使用。想象一下,即使电脑上没有安装Python,也能通过一个简单的EXE文件轻松完成工作,这是多么方便!因此,我决定不仅要写出这个脚本,还要学会如何将其打包成一个独立的EXE文件。这样,无需Python环境的电脑也能直接运行它

深入解析JavaScript的原生原型

在JavaScript中,除了自定义对象,还存在很多由JavaScript语言本身提供的原生对象。这些原生对象同样基于原型继承机制,拥有自己的原型。理解原生对象的原型非常重要,可以让我们正确使用这些内置对象,也有助于进一步理解JavaScript的原型继承系统。本文将详细解析原生对象的原型结构,揭开一些常见原生对象原型的神秘面纱。​学习原生对象的原型关系,有助于我们在日常开发中正确理解和使用这些JavaScript内置对象,避免一些常见陷阱。

WebSocket 入门实战

这个简单示例演示了如何使用 Spring Boot 和 Spring WebSocket 创建一个基本的 WebSocket 服务。通过这个例子,可以了解 WebSocket 在实时通信中的应用,如果大家在平时工作当中有遇到需要实时推送的场景,比如大屏实时展示数据变化,就可以用这种发放时。

深入三目运算符:JavaScript、C++ 和 Python 比较

三目运算符是编程中常用的条件表达式,它允许我们根据条件选择不同的值。我们将通过具体的例子分别介绍 JavaScript、C++ 和 Python 中的三目运算符,以便更好地理解它们的用法和特性。JavaScript 示例// 例子: 根据条件选择不同的值var x = 10;var y = 20;"x 大于 y" : "x 不大于 y";在这个例子中,如果x大于y,则result的值为 “x 大于 y”,否则为 “x 不大于 y”。C++ 示例// 例子: 根据条件选择不同的值。

Java中的4种引用类型,你知道几种?

Java作为一门面向对象的编程语言,内存管理一直是程序员需要关注的重要方面。在Java中,垃圾回收机制负责自动管理内存,而引用类型则是垃圾回收的重要参考。本文将深入讨论Java中的四种引用类型:强引用、弱引用、软引用和虚引用,以及它们在内存管理中的应用和区别。

python实现网络爬虫代码_python如何实现网络爬虫

2、【find()】和【find_all()】方法可以遍历这个html文件,提取指定信息。return soup.find_all(string=re.compile( '百度' )) #结合正则表达式,实现字符串片段匹配。print(res) #打印输出[root@localhost demo]# python3 demo1.py。[root@localhost demo]# vim demo.py#web爬虫学习 -- 分析。r.raise_for_status() #如果状态码不是200,产生异常。

深入浅出:Golang内存逃逸机制与性能优化技巧

内存逃逸发生在编译期间,当编译器判断一个变量的生命周期超出了其当前作用域,它会将该变量分配到堆上,而非栈上。这种情况虽然保证了程序的正确性,但会增加垃圾回收的负担,从而影响程序性能。通过本文的深入探讨,我们了解了Golang内存逃逸的基本原理,以及它是如何影响程序性能的。我们探讨了内存逃逸的常见场景,并通过具体的代码示例展示了这些问题的出现和解决方法。最重要的是,我们学习了一系列性能优化技巧,包括数据结构优化、减少不必要的指针使用、利用栈空间、使用内存池,以及定期进行性能分析。

深入解析JavaScript中new Function语法

Function是JavaScript中非常重要的内置构造函数,可以用来动态创建函数。new Function语法就是其中一种函数创建方式。但是new Function也有一定的缺点需要注意。本文将带您深入解析new Function语法,了解其应用场景以及需要注意的问题。​new Function是动态创建函数的一种方式,但也有缺点。为了更好的代码质量和性能,应该慎用或避免使用。对JavaScript函数和作用域有深入理解,可以编写出更简洁、高效、稳定的代码。​。

RTSP协议播放不兼容TPLINK摄像头的处理办法

报错的内容是Number of element invalid in origin string.两个数字中间多了一个空格,导致判断数据不等于6。所以数据输入的时候把中间的空格去掉一个即可。

编码技巧:如何在Golang中高效解析和生成XML

在本文中,我们详细探讨了在Golang中高效处理XML的各个方面。从基础的XML概念到解析和生成XML文件的具体步骤,再到错误处理、调试技巧以及一些高级技巧和最佳实践,我们提供了一个全面的指南,旨在帮助读者掌握在Golang中处理XML的技能。理解Golang中XML处理的基本概念和方法。使用包来解析和生成XML文件。有效地处理常见的XML解析和生成中的错误。应用最佳实践和高级技巧来优化XML处理的性能和安全性。

Java中 && 和|| 在同一个 if 里面使用 会出现啥问题&Java中运算符“|”和“||”以及“&”和“&&”区别

如果在同一个 if 语句中同时使用 && 和 || 运算符,可能会导致逻辑错误。这样就会导致逻辑错误,当 a 为 false,b 为 true 时,输出结果会是 “Goodbye”,而不是我们期望的 “Hello World”。假设有两个布尔类型的变量 a 和 b,我们要判断当 a 为 true 或者 b 为 true 时输出 “Hello World”,否则输出 “Goodbye”。:不论运算符左侧为true还是false,右侧语句都会进行判断,下面代码。

tomcat缺少awt支持的解决&java.lang.NoClassDefFoundError: Could not initialize class sun.awt.X11GraphicsEnvir

这几天,线上的项目出现了一个问题,就是二维码图片没有出来,考虑到图像都是用到awt库,可能是tomcat没有图像库的问题,给tomcat加上awt的支持就解决了。加在catalina.sh的开头的JAVA_OPTS环境变量中加入,然后重启搞定。

如何用pthon连接mysql和mongodb数据库【极简版】

发现宝藏 前言 1. 连接mysql 1.1 安装 PyMySQL 1.2 导入 PyMySQL 1.3 建立连接 1.4 创建游标对象 1.5 执行查询 1.6 关闭连接 1.7 完整示例 2. 连接mongodb 2.1 安装 PyMongo 2.2 导入 PyMongo 2.3 建立连接 2.4