Go 分布式学习利器(20)-- Go并发编程之多路选择和超时控制,channel的关闭和广播
Select 多路选择
基本使用语法如下:
select {
case ret := <-retCh1: //阻塞事件,等待channel1的消息t.Logf("result %s \n",ret)
case ret := <-retCh2:t.Logf("result %s \n", rest)
default :t.Error("return empty")
关于channel部分其实是阻塞的,也就是select实际执行的过程中会阻塞在对应的channel部分,直到某一个case对应的channel有有效的数据才会执行该case下的逻辑。
实际程序执行的过程中 如果出现两个channel同时有有效数据,那两个case内部的执行顺序是无法严格保证的,只能由程序员自己来控制。
Select 超时控制
同样如上代码,我们要控制channel获取数据的时间,防止channel中等待有效数据时间过长,所以可以增加一些超时控制:
select {
case ret := <-retCh1: //阻塞事件,等待channel1的消息t.Logf("result %s \n",ret)// 等待1s 返回一个channel的有效数据,且第一个case还未得到有效数据,则输出超时
case <- time.After(time.Second * 1): t.Error("time out")
所以 select 可以用于保证多个协程之间的高可用,防止slow response的出现。
以上两种select多路选择用法 的 测试代码如下:
package select_testimport ("fmt""testing""time"
)
func service() string {time.Sleep(time.Millisecond * 400)return "Service1 is Done"
}func AsyncService(i int) chan string {rech := make(chan string,1) // 声明一个channelvar ret stringgo func() {if i == 1 {ret = service()} else {ret = service1()}fmt.Println("resturned result")rech <- ret // 向 channel 中放数据fmt.Println("service exited")}()return rech // 返回channel
}// 测试超时机制来避免等待channel时间过长
// ret 这个channel需要等待AsyncService 函数中的routine中的
// service函数返回结果,才会将数据填充到ret 中
// service需要执行400ms,所以这里会出现超时的情况
func TestSelectTimeout(t *testing.T) {select {case ret := <-AsyncService(1):t.Logf("result is %s", ret)case <- time.After(time.Millisecond * 100):t.Error("time out")}
}func service1() string {time.Sleep(time.Millisecond * 500)return "Service2 is Done"
}// 测试多个channel返回数据,挑选其中一个先准备好的channel来执行
func TestSelect(t *testing.T) {select {case ret1 := <- AsyncService(1):t.Logf("result is %s",ret1)case ret2 := <- AsyncService(2):t.Logf("result is %s", ret2)case <- time.After(time.Millisecond * 600):t.Log("Time out")}
}
channel 的关闭和广播
channel 可以说是Go语言中 协程之间通信的一种机制,支持带buffer和不带buffer两种模式,非常方便得实现不同协程之间的通信过程,但是在具体的通信过程中也会暴露一些问题,如下生产者,消费者代码:
package close_channelimport ("fmt""sync""testing"
)// 数据生产者
func dataProducer(ch chan int, wg *sync.WaitGroup) {go func() {for i := 0; i < 10; i ++ {ch <- i}wg.Done()}()
}// 数据消费者
func dataComsumer(ch chan int, wg *sync.WaitGroup) {go func() {// 没有办法准确知道channel中什么时候没有数据,这里保持和生产者相同的填充数据的次数for i := 0;i < 10; i++ { data := <-chfmt.Printf("consumer data %d\n",data)}wg.Done()}()
}func TestProducer(t *testing.T) {var wg sync.WaitGroup // wait groupch := make(chan int)wg.Add(1)dataProducer(ch, &wg)wg.Add(1)dataComsumer(ch, &wg)wg.Wait() // 阻塞,直到waitgroup执行完毕,wg的值变为0
输出如下:
=== RUN TestProducer
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 4
consumer data 5
consumer data 6
consumer data 7
consumer data 8
consumer data 9
--- PASS: TestProducer (0.00s)
上面代码中消费者协程在channel buffer 内部没有数据的时候只能够被动阻塞等待,直到channel中数据有效。这个实现导致生产者消费者之间的代码耦合度比较高,且当程序中存在多个producer和多个receiver的时候,receivers并不一定能够确切得知道什么时候producer才不会生产数据。
还是如上代码,我们如果启动多个消费者就能够很明显得看到问题,如下测试代码:
func TestProducer(t *testing.T) {var wg sync.WaitGroupch := make(chan int)wg.Add(1)dataProducer(ch, &wg)wg.Add(1)dataComsumer(ch, &wg)wg.Add(1)dataComsumer(ch, &wg)wg.Wait()
}
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 5
consumer data 6
consumer data 7
consumer data 8
consumer data 9
consumer data 0
consumer data 0
consumer data 4
consumer data 0
consumer data 0
consumer data 0
consumer data 0
consumer data 0
执行的过程中可以发现消费者消费了0,因为这个时候生产者已经不再生产数据了,再去消费的话会取到channel默认的值即0,且channel没有关闭,消费者还在等待有效的数据,还会一直阻塞程序运行。
所以channel 也提供了主动关闭的机制,即当生产者不再发送数据的时候可以主动关闭channel,而消费者再次使用channel的时候只需要确认一下channel的状态即可。如果channel为不可用,即可返回。
关于 关闭的channel 需要注意如下几点:
- 向关闭的channel 发送数据会导致panic异常
- v,ok <- ch; 接受channel的值和状态,如果ok为true,则表示channel可以接受数据;如果ok 为false,表示channel已经关闭,无法接受数据
- 所有channel的接受者在channel关闭的时候都会从阻塞等待中返回上述OK值为false。这个广播机制可以被用作向多个订阅者发送信号。
修改测试代码如下:
package close_channelimport ("fmt""sync""testing"
)func dataProducer(ch chan int, wg *sync.WaitGroup) {go func() {for i := 0; i < 10; i ++ {ch <- i}close(ch)//ch <- 11 // 向关闭后的channel发送数据会报panic错误wg.Done()}()
}func dataComsumer(ch chan int, wg *sync.WaitGroup) {go func() {for {if data,ok := <-ch; ok {// 接受关闭后channel的广播,保证channel的输出结果是有效的fmt.Printf("consumer data %d\n",data)}else {break}}wg.Done()}()
}func TestProducer(t *testing.T) {var wg sync.WaitGroupch := make(chan int)wg.Add(1)dataProducer(ch, &wg)wg.Add(1)dataComsumer(ch, &wg) // 启动多个消费者wg.Add(1)dataComsumer(ch, &wg)wg.Wait()
}
输出如下:
=== RUN TestProducer
consumer data 0
consumer data 1
consumer data 2
consumer data 3
consumer data 4
consumer data 5
consumer data 6
consumer data 7
consumer data 9
consumer data 8
--- PASS: TestProducer (0.00s)
可能部分数据的输出顺序和单个消费者的数据输出顺序有差异,因为消费者也是各自的独立协程,所以在获取数据并输出的顺序会有差异。
相关文章:

Java项目:网盘系统设计和实现(java+ssm+jpa)
源码获取:博客首页 "资源" 里下载! 很多同学都有自己的网盘,方便存储一些java学习教程。该毕业设计实现了一个简易的网盘,包含文件上传和文件分享等功能。 后端技术采用了spring,spring mvc,JPA&…

快速学习的方法论
大多数人认为学习的快慢取决于学习者的天赋,实际上研究表明学习方法起着至关重要的作用。更深层次的知识加工,与时而反复的温故知新,在某些情况下会加倍你的学习效率。最近学习了如何快速学习的方法论,分享给大家。 是否能加速理解…

C#拉姆达(=)表达式
前言: 之前小猪曾经分享过自己对C#委托的一点理解 其实在使用委托的过程中我们会大量的使用拉姆达(>)表达式 介绍: "Lambda表达式"是一个匿名函数,是一种高效的类似于函数式编程的表达式,Lambda简化了开发中需要编写…
Python爬虫入门教程 57-100 python爬虫高级技术之验证码篇3-滑动验证码识别技术
滑动验证码介绍 本篇博客涉及到的验证码为滑动验证码,不同于极验证,本验证码难度略低,需要的将滑块拖动到矩形区域右侧即可完成。 这类验证码不常见了,官方介绍地址为:https://promotion.aliyun.com/ntms/act/captchaI…

FlameScope 更高级全面的火焰图
FlameScope 更高级全面的火焰图 文章目录FlameScope 更高级全面的火焰图安装步骤安装问题fix使用方式网飞(Netflix)开发的火焰图工具能够更好得呈现出一段时间内的服务器on/off cpu 的热力图。安装步骤 $ git clone https://github.com/Netflix/flamescope $ cd flamescope $ …

sql 基础--mysql 5 (6)
12.子查询 子查询进行过滤 mysql> select msg from pw_luck where name wang5-> ; ------ | msg | ------ | 1001 | | 1000 | | 1000 | | 100 | | 100 | ------ 5 rows in set (0.03 sec)mysql> select uid from pw_luck where msg in (select msg from pw_luck w…

Java项目:就业管理系统设计和实现(java+springboot+ssm)
源码获取:博客首页 "资源" 里下载! 就业管理系统: 该毕业设计采用了spring boot,spring,spring mvc,mybatis作为后端技术框架,这些组合稳定抗打,前端使用了layui,界面美观…

算法设计与分析之循环与递归
前言:循环与递归可以说是算法设计中最基本但却也是最重要的工具方法。循环和递归对于学习过高级程序设计语言的人来说都并不陌生,但还是有必要仔细的探究一下循环和递归之间的相似和区别。循环与递归最大的相似之处莫不是在于他们在算法设计中的工具作用…

面向对象与软件工程---团队作业1
1.队伍名称: 遥遥万里(还有很长路要走的意思) 2.队员信息: 陈雄(组长) 学号:1700509024 博客园链接:https://www.cnblogs.com/bearchan/ 廖鹏辉 学号:1700802007 博客园…

从paxos到raft zab,为何raft能够“独领风骚”
文章目录RAFT出现的缘由RAFT 的实现STATE MACHINELog Replicated State MachineLeader Election基本角色关键变量基本选举过程Log Replicated基本概念基本操作SafetyLog Replication: Consistency checkLeader Election: Leader Completeness总结RAFT 和 ZAB 的对比参考文献:阅…

Java项目:前台+后台精品水果商城系统设计和实现(java+Springboot+ssm+mysql+jsp+maven)
源码获取:博客首页 "资源" 里下载! 一、项目简述 本系统主要实现的功能有: 前台用户的登录注册,水果商品的展示,水果的购物车, 购物车新增结算等等,银行卡的支付绑定,收货…
Android屏幕像素密度适配详解
讲到像素密度,我们先要搞明白什么是像素密度,像素密度的字面上的意思为手机屏幕上一定尺寸区域内像素的个数。在Android开发中, 我们一般会使用每英寸像素密度(dpi)这样一个单位来表示手机屏幕的像素密度,d…

如让自己想学不好shell编程都困难?
众所周知,shell是linux运维必备的技术,必须要掌握,但是shell语法复杂,灵活,网友掌握了语法也不知道如何应用到实际运维中,老男孩培训shell编程给所有linux运维人员带来了学好shell的法宝,老男孩培训2014最新…

sqlserver可将字符转成数字再进行sum,如果varchar类型中存放的都是数字
sqlserver语法: select sum(cast(score as int)) as score from 表名; 注意:int是整型,在实际操作中根据自己需要的类型转换。转载于:https://www.cnblogs.com/MisMe/p/10690748.html

LSM 优化系列(六)-- 【ATC‘20】MatrixKV : NVM 的PMEM 在 LSM-tree的write stall和写放大上的优化
文章目录LSM 问题背景MatrixKV 设计细节整体架构介绍Matrix Container介绍ReceiverRowTableCompactorSpace managementColumn Compaction介绍对于Column Compaction的总结读加速 Cross-row Hint SearchMatrixKv 写入完整流程MatrixKV 读取完整流程MatrixKV 性能总结这篇论文大家…

Java项目:前台+后台在线考试系统设计和实现(java+Springboot+ssm+mysql+jsp+maven)
源码获取:博客首页 "资源" 里下载! 一、项目简述 本系统主要实现的功能有: 学生以及老师的注册登录,在线考试,错题查询,学生管理,问题管理,错题管理,错题查询…

修改nginx服务器类型
通常nginx服务器不隐藏服务器类型及版本信息 curl -I http://www.aaa.com 获取web服务器的类型和版本代码 HTTP/1.1 200 OK Server: nginx nginx/0.8.53 Date: Tue, 14 Dec 2010 08:10:06 GMT Content-Type: text/html Content-Length: 151 Last-Modified: Mon, 13 Dec 2…

JS 自带函数
JS数组方法汇总 array数组元素的添加和删除js数组元素的添加和删除一直比较迷惑,今天终于找到详细说明的资料了,先给个我测试的代码^-^var arr new Array();arr[0] "aaa";arr[1] "bbb";arr[2] "ccc";//alert(arr.leng…

Flink学习笔记:Operators之CoGroup及Join操作
本文为《Flink大数据项目实战》学习笔记,想通过视频系统学习Flink这个最火爆的大数据计算框架的同学,推荐学习课程: Flink大数据项目实战:http://t.cn/EJtKhaz 1. Window CoGroup与Join 1.1回顾RDBMS各种join 假设有两个表A和B 1.…

Rocksdb 的优秀代码(二)-- 工业级 打点系统 实现分享
文章目录前言数据结构选型打点代码设计耗时打点请求计数打点打点总结前言 一个完善的分布式系统一定是需要完善的打点统计,不论是对系统内核 还是 对系统使用者都是十分必要的。系统的客户需要直观得看到这个系统的性能相关的指标来决定是否使用以及如何最大化使用…

JVM中可生成的最大Thread数量
最近想测试下Openfire下的最大并发数,需要开大量线程来模拟客户端。对于一个JVM实例到底能开多少个线程一直心存疑惑,所以打算实际测试下,简单google了把,找到影响线程数量的因素有下面几个: -Xms intial java heap s…

Java项目:在线电影售票系统设计和实现(java+Springboot+ssm+mysql+jsp+maven)
源码获取:博客首页 "资源" 里下载! 一、项目简述 前台: 1、正在上映的电影浏览查看。 2、影院信息浏览查看。 3、新闻咨询信息浏览查看。 4、地域信息查看切换。 5、用户注册登录。 6、电影排期查看。 7、在线选座生成…

matlab正态分布
normrnd(mu, sigma, m,n) 返回m x n的随机数,正态分布均值mu,标准差sigma。 mvnrnd(mu, sigma, m) 返回m个随机数(点),是多元正太分布,mu是均值向量,sigma是协方差。 x normrnd(0,4,1,100000);…

MYSQL语句
-- 一、管理数据库-- 1.1 创建数据库CREATE DATABASE day15; SHOW DATABASES; CREATE TABLE student( id INT, NAME VARCHAR(20), age INT); -- 查看表SHOW TABLES; -- 二、管理数据-- 1.1插入数据(insert into)-- 需求: 往学生表插入数据INS…

Intel Optane PMEM 概览
文章目录前言基本架构编程模型PMDK接口架构接口概览pmdk 安装开发文档汇总PMEM性能官方性能实测性能前言 随着以PCM 为存储单元的3D XPoint 非易失存储介质 不断精进的工艺,以及 上层硬件协议栈的飞速发展,为非易失内存这样硬件的出现提供了技术工艺基础…

Java项目:新闻发布系统(java+Springboot+ssm+mysql+maven)
源码获取:博客首页 "资源" 里下载! 一、项目简述 功能: 区分为管理员用户和普通用户,管理员用户能删除评论, 调整新闻显示/隐藏,修改新闻,删除普通用户,普通用户能 登陆浏…

Linux下搭建Lotus Domino集群
Linux下搭建Lotus Domino 集群本文内容是Linux平台下Lotus Domino服务器部署案例(http://chenguang.blog.51cto.com/350944/1334595)的另一个模块,所以大家首先要有以上基础之后然后继续实验。集群是 Lotus Domino Server 提供的最重要特性之…

Centos下卸载openjdk并安装自定义jdk
1、查看是否安装了openjdk java -version 2、查看需要卸载的openjdk信息,其中只需要删除红色框标记的地方 rpm -qa | grep java 3、删除openjdk rpm -e --nodeps 需要删除的java组件 4、创建文件夹java mkdir java 5、到官网下载linux版本的jdk(如果不能…

pmdk -- libpmemlog 介绍
文章目录1. libpmemlog 应用背景2. libpmemlog 使用方式2.1 基本接口2.2 接口使用3. Libpmemlog 性能3.1 write sys call 性能3.2 libpmemlog 性能1. libpmemlog 应用背景 本文介绍的是英特尔 傲腾持久化内存 pmdk中 的一个持久化日志的库。 我们正常系统中会将日志 形成一个…

Java项目:家庭财务管理系统(java+Springboot+ssm+mysql+maven)
源码获取:博客首页 "资源" 里下载! 一、项目简述 功能: 家庭财务管理系统,具有收入统计,支出统计,汇总报 表,工资录入,其他收入等录入开支信息,echart图标插 …