当前位置: 首页 > 编程日记 > 正文

Golang 搭建 WebSocket 应用(八) - 完整代码

本文应该是本系列文章最后一篇了,前面留下的一些坑可能后面会再补充一下,但不在本系列文章中了。

整体架构

再来回顾一下我们的整体架构:

在这里插入图片描述

在我们的 demo 中,包含了以下几种角色:

  1. 客户端:一般是浏览器,用于接收消息;
  2. Hub:消息中心,用于管理所有的客户端连接,以及将消息推送给客户端;
  3. 调用 /send 发送消息的应用:用于将消息发送给 Hub,然后由 Hub 将消息推送给客户端。

然后,每一个 WebSocket 连接都有一个关联的读协程和写协程,
用于读取客户端发送的消息,以及将消息推送给客户端。

目录结构

├── LICENSE  // 协议
├── Makefile // 一些常用的命令
├── README.md
├── authenticator.go      // 认证器
├── authenticator_test.go // 认证器测试
├── bytes.go // 字符串和 []byte 之间转换的辅助方法
├── client.go // WebSocket 客户端
├── go.mod    // 项目依赖
├── go.sum    // 项目依赖
├── hub.go    // 消息中心
├── main.go   // 程序入口
├── message   // 消息记录器
│   ├── db_logger.go
│   ├── db_logger_test.go
│   ├── log.go
│   └── stdout_logger.go
├── server.go // HTTP 服务
└── server_test.go // HTTP 接口的测试

运行

注:需要 Go 1.20 或以上版本

  1. 下载依赖:

可以使用七牛云的代理加速下载。

go mod tidy
  1. 启动 WebSocket 服务端:
go run main.go

Hub 代码

最终,我们的 Hub 代码演进成了下面这样:

// bufferSize 通道缓冲区、map 初始化大小
const bufferSize = 128

// Handler 错误处理函数
type Handler func(log message.Log, err error)

// Hub 维护了所有的客户端连接
type Hub struct {
	// 注册请求
	register chan *Client
	// 取消注册请求
	unregister chan *Client
	// 记录 uid 跟 client 的对应关系
	userClients map[string]*Client
	// 互斥锁,保护 userClients 以及 clients 的读写
	sync.RWMutex
	// 消息记录器
	messageLogger message.Logger
	// 错误处理器
	errorHandler Handler
	// 验证器
	authenticator Authenticator
	// 等待发送的消息数量
	pending atomic.Int64
}

// 默认的错误处理器
func defaultErrorHandler(log message.Log, err error) {
	res, _ := json.Marshal(log)
	fmt.Printf("send message: %s, error: %s\n", string(res), err.Error())
}

func newHub() *Hub {
	return &Hub{
		register:      make(chan *Client),
		unregister:    make(chan *Client),
		userClients:   make(map[string]*Client, bufferSize),
		RWMutex:       sync.RWMutex{},
		messageLogger: &message.StdoutMessageLogger{},
		errorHandler:  defaultErrorHandler,
		authenticator: &JWTAuthenticator{},
	}
}

// 注册、取消注册请求处理
func (h *Hub) run() {
	for {
		select {
		case client := <-h.register:
			h.Lock()
			h.userClients[client.uid] = client
			h.Unlock()
		case client := <-h.unregister:
			h.Lock()
			close(client.send)
			delete(h.userClients, client.uid)
			h.Unlock()
		}
	}
}

// 返回 Hub 的当前的关键指标
func metrics(hub *Hub, w http.ResponseWriter) {
	pending := hub.pending.Load()
	connections := len(hub.userClients)
	_, _ = w.Write([]byte(fmt.Sprintf("# HELP connections 连接数\n# TYPE connections gauge\nconnections %d\n", connections)))
	_, _ = w.Write([]byte(fmt.Sprintf("# HELP pending 等待发送的消息数量\n# TYPE pending gauge\npending %d\n", pending)))
}

其中:

  • Hub 中的 registerunregister 通道用于处理客户端的注册和取消注册请求;
  • Hub 中的 userClients 用于记录 uidClient 的对应关系;
  • Hub 中的 messageLogger 用于记录消息;
  • Hub 中的 errorHandler 用于处理错误;
  • Hub 中的 authenticator 用于验证客户端的身份;
  • Hub 中的 pending 用于记录等待发送的消息数量。

目前实现存在的问题:

  • registerunregister 通道被消费的时候需要加锁,这样会导致 registerunregister 变成串行的,性能不好;
  • userClients 也是需要加锁的,这样会导致 userClients 的读写也是串行的,性能不好;

对于这两个问题,前面我们讨论过,一种可行的办法分段 map,然后对每一个 map 都有一个对应的 sync.Mutex 互斥锁来保证其读写的安全。

Client 代码

Client 比较关键的方法是:

  • writePump:负责将消息推送给客户端。
  • serveWs:处理 WebSocket 连接请求。
  • send:处理消息发送请求。

writePump

这个方法会从 send 通道中获取消息,然后推送给客户端。
推送失败会调用 errorHandler 处理错误。
推送成功会将 pending 减一。

// writePump 负责推送消息给 WebSocket 客户端
//
// 该方法在一个独立的协程中运行,我们保证了每个连接只有一个 writer。
// Client 会从 send 请求中获取消息,然后在这个方法中推送给客户端。
func (c *Client) writePump() {
	defer func() {
		_ = c.conn.Close()
	}()

	// 从 send 通道中获取消息,然后推送给客户端
	for {
		messageLog, ok := <-c.send

		// 设置写超时时间
		_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
		// c.send 这个通道已经被关闭了
		if !ok {
			c.hub.pending.Add(int64(-1 * len(c.send)))
			return
		}

		if err := c.conn.WriteMessage(websocket.TextMessage, StringToBytes(messageLog.Message)); err != nil {
			c.hub.errorHandler(messageLog, err)
			c.hub.pending.Add(int64(-1 * len(c.send)))
			return
		}

		c.hub.pending.Add(int64(-1))
	}
}

serveWs

serveWs 方法会处理 WebSocket 连接请求,然后将其注册到 Hub 中。
在连接的时候会对客户端进行认证,认证失败会断开连接。
最后会启动读写协程。

// serveWs 处理 WebSocket 连接请求
func serveWs(hub *Hub, w http.ResponseWriter, r *http.Request) {
	// 升级为 WebSocket 连接
	conn, err := upgrader.Upgrade(w, r, nil)
	if err != nil {
		w.WriteHeader(http.StatusBadRequest)
		_, _ = w.Write([]byte(fmt.Sprintf("upgrade error: %s", err.Error())))
		return
	}

	// 认证失败的时候,返回错误信息,并断开连接
	uid, err := hub.authenticator.Authenticate(r)
	if err != nil {
		_ = conn.SetWriteDeadline(time.Now().Add(time.Second))
		_ = conn.WriteMessage(websocket.TextMessage, []byte(fmt.Sprintf("authenticate error: %s", err.Error())))
		_ = conn.Close()
		return
	}

	// 注册 Client
	client := &Client{hub: hub, conn: conn, send: make(chan message.Log, bufferSize), uid: uid}
	client.conn.SetCloseHandler(closeHandler)
	// register 无缓冲,下面这一行会阻塞,直到 hub.run 中的 <-h.register 语句执行
	// 这样可以保证 register 成功之后才会启动读写协程
	client.hub.register <- client

	// 启动读写协程
	go client.writePump()
	go client.readPump()
}

send

send 是一个 http 接口,用于处理消息发送请求。
它会从 Hub 中获取 uid 对应的 Client,然后将消息发送给客户端。

// send 处理消息发送请求
func send(hub *Hub, w http.ResponseWriter, r *http.Request) {
	uid := r.FormValue("uid")
	if uid == "" {
		w.WriteHeader(http.StatusBadRequest)
		_, _ = w.Write([]byte("uid is required"))
		return
	}

	// 从 hub 中获取 uid 关联的 client
	hub.RLock()
	client, ok := hub.userClients[uid]
	hub.RUnlock()
	if !ok {
		w.WriteHeader(http.StatusBadRequest)
		_, _ = w.Write([]byte(fmt.Sprintf("client not found: %s", uid)))
		return
	}

	// 记录消息
	messageLog := message.Log{Uid: uid, Message: r.FormValue("message")}
	_ = hub.messageLogger.Log(messageLog)

	// 发送消息
	client.send <- messageLog

	// 增加等待发送的消息数量
	hub.pending.Add(int64(1))
}

github

完整代码可以在 github 上进行查看:https://github.com/eleven26/go-pusher

相关文章:

并发编程下的集合:数组寻址、LinkedList、HashMap、ConcurrentHashMap

如果发现hash取模后的数组索引位下无元素则直接新增,若不是空那就说明存在hash冲突,则判断数组索引位链表结构中的第一个元素的key以及hash值是否与新的key一致则直接覆盖,若不一致则判断当前的数组索引下的链表结构是否为红黑树,若为红黑树则走红黑树的新增方法,若不为红黑树则遍历当前链表结构,遍历中发现某个节点元素的next为null是则直接将新元素指针与next进行关联,若在遍历到next为空前判断到,某个节点的key以及key的hash值与新的key与新的keyhash值一致时则走覆盖。

【Mongdb之数据同步篇】什么是Oplog、Mongodb 开启oplog,java监听oplog并写入关系型数据库、Mongodb动态切换数据源

oplog是local库下的一个固定集合,Secondary就是通过查看Primary 的oplog这个集合来进行复制的。每个节点都有oplog,记录这从主节点复制过来的信息,这样每个成员都可以作为同步源给其他节点。Oplog 可以说是Mongodb Replication的纽带了。

【日常开发之Windows共享文件】Java实现Windows共享文件上传下载

下拉框选择你选择的用户点击添加,然后共享确定。创建一个文件夹然后点击属性界面,点击共享。maven版本存在于SMB协议的兼容问题。首先开启服务,打开控制面板点击程序。点击启用或关闭Windows功能。我这边是专门创建了一个用户。SMB1.0选中红框内的。

rust wasm入门

demo## 编译 Rust 为 WebAssembly在本教程中,我们将使用 Rust 的 npm 包构建工具 wasm-pack 来构建一个 npm 包。

iced 入门一

本教程的目标是创建一个简单的购物清单应用程序。我们希望允许添加和删除购物清单中的项目。在编写代码之前,我们必须首先了解 Iced 构建的结构:Elm 架构。它是 GUI 库使用的架构,最初用于 Elm 编程语言。它的核心原则很简单。它围绕三个概念构建:模型、视图和更新。

注解annotation

Kubernetes的系统组件(例如,kube-scheduler、kube-controller-manager、kube-apiserver、kubectl 或其他第三方组件)向用户的Kubernetes对象添加注解时,必须指定一个前缀。注解(annotation)可以用来向 Kubernetes 对象的 metadata.annotations 字段添加任意的信息。除了使用注解,您也可以将这类信息存放在一个外部的数据库,然而,在使用、分享这些信息的时候,可能会变得难以管理。

Rust XTask 模式介绍与应用

XTask(扩展任务)是一种在Rust项目中定义和执行自定义构建任务的方式。它通过创建一个独立的Rust库或二进制项目来封装这些任务,利用Rust语言的强类型、安全性和跨平台能力,使得构建流程更加健壮、可读和可维护。

ModuleNotFoundError: No module named ‘qcloud_cos‘

是腾讯云提供的一个Python SDK,用于与腾讯云对象存储(COS)服务进行交互。使用pip安装qcloud_cos报以下错误。这个错误表示Python无法找到名为。

在Java中使用WebSocket

WebSocket是一种协议,用于在Web应用程序和服务器之间建立实时、双向的通信连接。它通过一个单一的TCP连接提供了持久化连接,这使得Web应用程序可以更加实时地传递数据。WebSocket协议最初由W3C开发,并于2011年成为标准。

需要在method方法被调用之后,仅打印出a=100,b=200,请写出method方法的代码

通常,此流对应于显示器输出或者由主机环境或用户指定的另一个输出目标。通常,此流对应于键盘输入或者由主机环境或用户指定的另一个输入源。public static final PrintStream err“标准”错误输出流。PrintStream 是打印输出流,它继承于FilterOutputStream。第二个用的是用的是char类型,根本不是方法,当要输出方法体的时候,会给你遍历数组。通常,此流对应于显示器输出或者由主机环境或用户指定的另一个输出目标。诡异的是,如果错了,面试官对你说了一句:你回去看看,

一个合格的Java选手必须要掌握的并发锁知识

Java内置锁:基于Java语法层面(关键词)实现的锁,主要是根据Java语义来实现,最典型的应用就是synchronized。Java显式锁:基于JDK层面实现的锁,主要是根据基于Lock接口和ReadWriteLock接口,以及统一的AQS基础同步器等来实现,最典型的有ReentrantLock。使用方式:synchronized关键字互斥锁主要有作用于对象方法上面,作用于类静态方法上面,作用于对象方法里面,作用于类静态方法里面等4种方式。

Integer.toHexString(b & 0xff)理解以及& 0xff什么意思

首先toHexString传的参数应该是int类型32位,此处传的是byte类型8位,所以前面需要补24个0。然后& 0xff 就是把前面24个0去掉只要后8位。toHexString(b & 0xff)相当于做了一次位的与运算,将前24位字符省略,将后8位保留。是两个十六进制的数,每个f用二进制表示是1111,所以占四位(bit),两个f()占八位(bit),八位(bit)也就是一个字节(byte).这个方法是把字节(转换成了int)以16进制的方式显示。我的理解是这样,如有不对欢迎指正!

使用JavaScript实现复杂功能:一个完整的电商网站搜索功能

随着互联网的发展,电子商务网站已经成为人们购物的重要平台。而在这些网站中,搜索功能无疑是核心功能之一。用户可以通过搜索快速找到他们需要的商品,从而提高购物体验。本文将详细介绍如何使用JavaScript实现一个完整的电商网站搜索功能。

C++并发编程:互斥锁std::mutex和lock_guard的使用

对象离开其作用域时,会自动调用析构函数,该析构函数会释放锁。这确保了在任何情况下(包括由于异常等原因导致的提前退出),锁都会被正确释放,从而避免了忘记手动释放锁而导致的死锁问题。mutex 用于控制多个线程访问共享资源,确保在任意时刻只有一个线程可以访问该资源,避免数据竞争。这确保了同一时刻只有一个线程可以访问被保护的资源,从而防止多线程并发访问导致的数据不一致性。是 C++ 标准库中提供的一个模板类,用于在其构造时自动获取锁,在析构时自动释放锁。是 C++ 标准库中提供的一种用于多线程同步的互斥锁实现。

上位机图像处理和嵌入式模块部署(qt插件的使用)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing @163.com】 一个软件一般有很多的功能,但是主流程只有一个。但在软件开发的过程当中,一般来说功能是需要不断添加的,但是主流程最好不要轻易修改。这里的插件就相当于各种各样的功能,而主流程就是如何怎么去调用这些插件的功能。所以,今天正好来学一下怎么添加qt插件,个人觉得这部分还是非常重要的。

C程序的内存空间布局(栈、堆、数据区、常量区、代码区)

较详细的介绍了栈、堆、数据区、常量区、代码区

Java中的四种访问权限(private,public,protected,无修饰)

/实体类属性和数据库字段名称不一致//实体类属性和数据库字段名称不一致return id;return age;emp.test();//直接调用public修饰的变量//private修饰的变量进行赋值//调用private修饰的变量1、public修饰符定义的属性和方法通过对象实例化进行调用,2、private修饰的属性通过set、get方法进行调用。

websocket服务端本地部署

即登录cpolar官网后,点击预留,保留一个固定tcp端口地址,然后将其配置到相应的隧道中即可。这里我们用cpolar内网穿透来映射内网端口,它支持http/https/tcp协议,不限制流量,无需公网ip,也不用设置路由器,操作简单。注意:该隧道选择的是临时tcp地址和端口,24小时内会变化,如需固定tcp地址,可升级为专业套餐做tcp地址固定!cpolar安装成功后,默认会配置两个默认隧道:一个ssh隧道和一个website隧道,可自行删减或者修改。,可以查看到token码,复制并执行命令进行认证。

Java中的方法重载和方法重写有什么区别?

Java中的方法重载(Overloading)和方法重写(Overriding)都是面向对象编程中的重要概念,但它们之间有一些区别。方法重载是指在同一个类中,可以定义多个具有相同名称但参数列表不同的方法。这些方法具有不同的参数类型、参数个数或参数顺序。在调用重载方法时,Java编译器会根据传递给方法的参数类型和数量来选择要调用的正确方法。方法重载主要用于解决方法的命名冲突和提高代码的可读性和可维护性。

python基础使用之变量,表达式,语句

PYTHON基础知识系列之变量、表达式、语句

C语言常见面试题:什么是宏,宏的作用是什么?

宏在计算机科学中是一种批量处理程序命令,它是一种抽象的规则或模式,用于说明某一特定输入(通常是字符串)如何根据预定义的规则转换成对应的输出(通常也是字符串)。在编译时,预处理器会对宏进行展开,即将宏的内容替换到宏所在的位置。以上是宏的一些主要作用,但并不是全部。在实际编程中,根据需要选择是否使用宏以及如何使用宏,以实现更好的代码组织和可读性。,这样就可以计算出a和b的和。这个例子展示了宏的基本用法和作用。在这个例子中,我们定义了一个宏。,用于计算两个数的和。时,预处理器会将其展开为。

python基础小知识:引用和赋值的区别

通过引用,就可以在程序范围内任何地方传递大型对象而不必在途中进行开销巨大的赋值操作。不过需要注意的是,这种赋值仅能做到顶层赋值,如果出现嵌套的情况下仍不能进行深层赋值。赋值与引用不同,复制后会产生一个新的对象,原对象修改后不会影响到新的对象。如果在原位置修改这个可变对象时,可能会影响程序其他位置对这个对象的引用

Python自动化实战之接口请求的实现

作为一位过来人也是希望大家少走一些弯路,如果你不想再体验一次学习时找不到资料,没人解答问题,坚持几天便放弃的感受的话,在这里我给大家分享一些自动化测试的学习资源,希望能给你前进的路上带来帮助。

C#winform上位机开发学习笔记3-串口助手的信息保存功能添加

上位机开发的系列学习笔记,避免遗忘多记录多补充多优化

前端JS代码中Object类型数据的相关知识

遍历JavaScript中的对象有几种方法,包括使用for…in循环、Object.keys()方法、Object.values()方法和Object.entries()方法。因此前端传入了日期类型数据之后,如果和后台数据库中的数据类型不一致,比如数据库中的日期数据类型格式是。前端传入的Object对象中其中某个字段值是日期类型的数据,则在前端的类型就是一个。,则数据传往后端之前需要做格式类型转换。,它的值是一个中国标准时间,比如。

Rust之旅 - Rust概念、Windows安装、环境配置

本章节介绍Rust概念、Windows安装、环境配置以及最初级的语法。至此,我们就成功的构建了一个Rust程序,并成功在Visual Studio Code里运行了这个程序,万事俱备,我们就可以开始Rust之旅了。资料获取,更多粉丝福利,关注下方公众号获取。

C语言中常用的字符串处理函数和内存操作函数

`memmove(void *destination, const void *source, size_t num)`:将`source`指向的内存块的前`num`个字节移动到`destination`所指向的内存块,即使内存块有重叠部分。返回指向`destination`的指针。- `memcpy(void *destination, const void *source, size_t num)`:将`source`指向的内存块的前`num`个字节复制到`destination`所指向的内存块。

一键式Excel分词统计工具:如何轻松打包Python脚本为EXE

最近,表姐遇到了一个挑战:需要从Excel文件中统计出经过分词处理的重复字段,但由于数据隐私问题,这些Excel文件不能外传。这种情况下,直接使用Excel内置功能好像是行不通的,需要借助Python脚本来实现。为了解决这个问题,我写了一个简单的数据分析和自动化办公脚本,以方便使用。想象一下,即使电脑上没有安装Python,也能通过一个简单的EXE文件轻松完成工作,这是多么方便!因此,我决定不仅要写出这个脚本,还要学会如何将其打包成一个独立的EXE文件。这样,无需Python环境的电脑也能直接运行它

深入解析JavaScript的原生原型

在JavaScript中,除了自定义对象,还存在很多由JavaScript语言本身提供的原生对象。这些原生对象同样基于原型继承机制,拥有自己的原型。理解原生对象的原型非常重要,可以让我们正确使用这些内置对象,也有助于进一步理解JavaScript的原型继承系统。本文将详细解析原生对象的原型结构,揭开一些常见原生对象原型的神秘面纱。​学习原生对象的原型关系,有助于我们在日常开发中正确理解和使用这些JavaScript内置对象,避免一些常见陷阱。

WebSocket 入门实战

这个简单示例演示了如何使用 Spring Boot 和 Spring WebSocket 创建一个基本的 WebSocket 服务。通过这个例子,可以了解 WebSocket 在实时通信中的应用,如果大家在平时工作当中有遇到需要实时推送的场景,比如大屏实时展示数据变化,就可以用这种发放时。