Colly源码解析——结合例子分析底层实现
通过《Colly源码解析——框架》分析,我们可以知道Colly执行的主要流程。本文将结合http://go-colly.org上的例子分析一些高级设置的底层实现。(转载请指明出于breaksoftware的csdn博客)
递归深度
以下例子截取于Basic
c := colly.NewCollector(// Visit only domains: hackerspaces.org, wiki.hackerspaces.orgcolly.AllowedDomains("hackerspaces.org", "wiki.hackerspaces.org"),)// On every a element which has href attribute call callbackc.OnHTML("a[href]", func(e *colly.HTMLElement) {link := e.Attr("href")// Print linkfmt.Printf("Link found: %q -> %s\n", e.Text, link)// Visit link found on page// Only those links are visited which are in AllowedDomainsc.Visit(e.Request.AbsoluteURL(link))})
c是Collector指针,它的Visit方法给scrape传递的“深度”值是1。
func (c *Collector) Visit(URL string) error {return c.scrape(URL, "GET", 1, nil, nil, nil, true)
}
由于NewCollector构造的Collector.MaxDepth为0,而在scrape方法内部调用的requestCheck中,如果此值为0,则不会去做深度检测
// requestCheck methodif c.MaxDepth > 0 && c.MaxDepth < depth {return ErrMaxDepth}
如果希望通过MaxDepth控制深度,则可以参见Max depth例子
c := colly.NewCollector(// MaxDepth is 1, so only the links on the scraped page// is visited, and no further links are followedcolly.MaxDepth(1),)// On every a element which has href attribute call callbackc.OnHTML("a[href]", func(e *colly.HTMLElement) {link := e.Attr("href")// Print linkfmt.Println(link)// Visit link found on pagee.Request.Visit(link)})
第4行将深度设置为1,这样理论上只能访问第一层的URL。
如果OnHTML中的代码和Basic例子一样,即使用Collector的Visit访问URL,则由于其depth一直传1,而导致requestCheck的深度检测一直不满足条件,从而会访问超过1层的URL。
所以第13行,调用的是HTMLElement的Visit方法
func (r *Request) Visit(URL string) error {return r.collector.scrape(r.AbsoluteURL(URL), "GET", r.Depth+1, nil, r.Ctx, nil, true)
}
相较于Collector的Visit,HTMLElement的Visit方法将Depth增加了1,并且传递了请求的上下文(ctx)。由于depth有变化,所以之后的深度检测会返回错误,从而只会访问1层URL。
规则
Collector的Limit方法用于设置各种规则。这些规则最终在Collector的httpBackend成员中执行。
一个Collector只有一个httpBackend结构体指针,而一个httpBackend结构体可以有一组规则
type httpBackend struct {LimitRules []*LimitRuleClient *http.Clientlock *sync.RWMutex
}
规则针对Domain来区分,我们可以通过设定不同的匹配规则,让每组URL执行相应的操作。这些操作包括:
- 访问并行数
- 访问间隔延迟
参见Parallel例子。只截取其中关键一段
// Limit the maximum parallelism to 2// This is necessary if the goroutines are dynamically// created to control the limit of simultaneous requests.//// Parallelism can be controlled also by spawning fixed// number of go routines.c.Limit(&colly.LimitRule{DomainGlob: "*", Parallelism: 2})
Collector的Limit最终会调用到httpBackend的Limit,它将规则加入到规则组后初始化该规则。
// Init initializes the private members of LimitRule
func (r *LimitRule) Init() error {waitChanSize := 1if r.Parallelism > 1 {waitChanSize = r.Parallelism}r.waitChan = make(chan bool, waitChanSize)hasPattern := falseif r.DomainRegexp != "" {c, err := regexp.Compile(r.DomainRegexp)if err != nil {return err}r.compiledRegexp = chasPattern = true}if r.DomainGlob != "" {c, err := glob.Compile(r.DomainGlob)if err != nil {return err}r.compiledGlob = chasPattern = true}if !hasPattern {return ErrNoPattern}return nil
}
第7行创建了一个可以承载waitChanSize个元素的channel。可以看到,如果我们在规则中没有设置并行数,也会创建只有1个元素的channel。这个channel会被用于调节并行执行的任务数量。所以这也就意味着,一旦调用了Limit方法而没设置Parallelism值,该Collector中针对符合规则的请求就会变成串行的。
第10和18行分别针对不同规则初始化一个编译器。因为这个操作比较重,所以在初始化时执行,之后只是简单使用这些编译器即可。
当发起请求时,流程最终会走到httpBackend的Do方法
func (h *httpBackend) Do(request *http.Request, bodySize int) (*Response, error) {r := h.GetMatchingRule(request.URL.Host)if r != nil {r.waitChan <- truedefer func(r *LimitRule) {randomDelay := time.Duration(0)if r.RandomDelay != 0 {randomDelay = time.Duration(rand.Int63n(int64(r.RandomDelay)))}time.Sleep(r.Delay + randomDelay)<-r.waitChan}(r)}
第2行通过域名查找对应的规则,如果找到,则在第4行尝试往channel中加入元素。这个操作相当于上锁。如果channel此时是满的,则该流程会被挂起。否则就执行之后的流程。在Do函数结束,命中规则的会执行上面的匿名函数,它在休眠规则配置的时间后,尝试从channel中获取数据。这个操作相当于释放锁。
Colly就是通过channel的特性实现了并行控制。
并行
在“规则”一节,我们讲到可以通过Parallelism控制并行goroutine的数量。httpBackend的Do方法最终将被Collector的fetch方法调用,而该方法可以被异步执行,即是一个goroutine。这就意味着承载Do逻辑的goroutine执行完毕后就会退出。而一种类似线程的技术在Colly也被支持,它更像一个生产者消费者模型。消费者线程执行完一个任务后不会退出,而在生产者生产出的物料池中取出未处理的任务加以处理。
以下代码截取于Queue
q, _ := queue.New(2, // Number of consumer threads&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage)……for i := 0; i < 5; i++ {// Add URLs to the queueq.AddURL(fmt.Sprintf("%s?n=%d", url, i))}// Consume URLsq.Run(c)
这次没有调用Collector的Visit等函数,而是调用了Queue的Run。
第2行创建了一个具有2个消费者(goroutine)的Queue。第10行预先给这个Queue加入5个需要访问的URL。
// AddURL adds a new URL to the queue
func (q *Queue) AddURL(URL string) error {u, err := url.Parse(URL)if err != nil {return err}r := &colly.Request{URL: u,Method: "GET",}d, err := r.Marshal()if err != nil {return err}return q.storage.AddRequest(d)
}
AddUrl的第11行将请求序列化,在第15行将该序列化数据保存到“仓库”中。
在Run方法中,Colly将启动2个goroutine。注意它是使用for循环组织的,这意味着如果for内无break,它会一直循环执行下去——不退出。
func (q *Queue) Run(c *colly.Collector) error {wg := &sync.WaitGroup{}for i := 0; i < q.Threads; i++ {wg.Add(1)go func(c *colly.Collector, wg *sync.WaitGroup) {defer wg.Done()for {
如果队列中没有需要处理的request,则会尝试退出
if q.IsEmpty() {if q.activeThreadCount == 0 {break}ch := make(chan bool)q.lock.Lock()q.threadChans = append(q.threadChans, ch)q.lock.Unlock()action := <-chif action == stop && q.IsEmpty() {break}}
activeThreadCount表示当前运行中的消费者goroutine数量。如果已经没有消费者了,则直接跳出for循环,整个goroutine结束。
如果还有消费者,则创建一个channel,并将其加入到q.threadChans的channel切片中。然后在第9行等待该channel被写入值。如果写入的是true并且此时没有需要处理的request,则退出goroutine。可以看到这段逻辑检测了两次是否有request,这个我们之后再讨论。
如果还有request要处理,则递增消费者数量(在finish中会递减以抵消)。然后从“仓库”中取出一个任务,在通过Request的Do方法发起请求,最后调用finish方法善后。
q.lock.Lock()atomic.AddInt32(&q.activeThreadCount, 1)q.lock.Unlock()rb, err := q.storage.GetRequest()if err != nil || rb == nil {q.finish()continue}r, err := c.UnmarshalRequest(rb)if err != nil || r == nil {q.finish()continue}r.Do()q.finish()}}(c, wg)}wg.Wait()return nil
}
finish方法干了三件事:
- 递减消费者数量,以抵消Run方法中的递增。
- 将Queue的各个等待中的,其他goroutine创建的channel传入true值,即告知他们可以退出了。
- 给Queue创建一个空的channel切片
func (q *Queue) finish() {q.lock.Lock()q.activeThreadCount--for _, c := range q.threadChans {c <- stop}q.threadChans = make([]chan bool, 0, q.Threads)q.lock.Unlock()
}
我们再看下怎么在请求的过程中给Queue增加任务
// AddRequest adds a new Request to the queue
func (q *Queue) AddRequest(r *colly.Request) error {d, err := r.Marshal()if err != nil {return err}if err := q.storage.AddRequest(d); err != nil {return err}q.lock.Lock()for _, c := range q.threadChans {c <- !stop}q.threadChans = make([]chan bool, 0, q.Threads)q.lock.Unlock()return nil
}
第3~9行,会将请求序列化后保存到“仓库”中。
第10~15行,会将其他goroutine创建的channel传入false,告知它们不要退出。然后再创建一个空的channel切片。
finish和AddRequest都使用锁锁住了所有的逻辑,而且它们都会把其他goroutine创建的channel传入值,然后将Queue的channel切片清空。这样就保证这些channel只可能收到一种状态。由于它自己创建的channel是在finish调用完之后才有机会创建出来,所以不会造成死锁。
再回来看goroutine退出的逻辑
if q.IsEmpty() {if q.activeThreadCount == 0 {break}ch := make(chan bool)q.lock.Lock()q.threadChans = append(q.threadChans, ch)q.lock.Unlock()action := <-chif action == stop && q.IsEmpty() {break}}
如果finish方法中递减的activeThreadCount为0,这说明这是最后一个goroutine了,而且当前也没request,所以退出。当然此时存在一种可能:在1行执行结束后,其他非消费者goroutine调用AddRequest新增了若干request。而执行第2行时,goroutine将退出,从而导致存在request没有处理的可能。
如果还存在其他goroutine,则本goroutine将在第5行创建一个channel,并将这个channel加入到Queue的channel切片中。供其他goroutine调用finish往channel中传入true,或者AddRequest传入false,调控是否需要退出本过程。在第9行等待channel传出数据前,可能存在如下几种情况:
- 执行了finish
- 执行了AddRequest
- 执行了finish后执行了AddRequest
- 执行了AddRequest后执行了finish
如果是第1和4种,action将是false。第2和3种,action是true。但是这个情况下不能单纯的通过action决定是否退出。因为第9和10行执行需要时间,这段时间其他goroutine可能还会执行AddRequest新增任务,或者GetRequest删除任务。所以还要在第10行检测下IsEmpty。
这段是我阅读Colly中思考的最多的代码,因为有goroutine和channel,导致整个逻辑比较复杂。也感慨下,虽然goroutine很方便,但是真的能把它写对也是不容易的。
分布式
在Queue例子中,我们看到“仓库”这个概念。回顾下Queue的例子,“仓库”是InMemoryQueueStorage。顾名思义,它是一个内存型的仓库,所以不存在分布式基础。
// create a request queue with 2 consumer threadsq, _ := queue.New(2, // Number of consumer threads&queue.InMemoryQueueStorage{MaxSize: 10000}, // Use default queue storage)
一个分布式的例子是Redis backend,截取一段
// create the redis storagestorage := &redisstorage.Storage{Address: "127.0.0.1:6379",Password: "",DB: 0,Prefix: "httpbin_test",}// add storage to the collectorerr := c.SetStorage(storage)if err != nil {panic(err)}// delete previous data from storageif err := storage.Clear(); err != nil {log.Fatal(err)}// close redis clientdefer storage.Client.Close()// create a new request queue with redis storage backendq, _ := queue.New(2, storage)
这儿创建了一个redis型的仓库。不仅Collector的Storage是它,Queue的Storage也是它。这样一个集群上的服务都往这个仓库里存入和取出数据,从而实现分布式架构。
redisstorage库引自github.com/gocolly/redisstorage。我们查看其源码,其实现了Collector的storage需要的接口
type Storage interface {// Init initializes the storageInit() error// Visited receives and stores a request ID that is visited by the CollectorVisited(requestID uint64) error// IsVisited returns true if the request was visited before IsVisited// is calledIsVisited(requestID uint64) (bool, error)// Cookies retrieves stored cookies for a given hostCookies(u *url.URL) string// SetCookies stores cookies for a given hostSetCookies(u *url.URL, cookies string)
}
以及Queue的storage需要的
// Storage is the interface of the queue's storage backend
type Storage interface {// Init initializes the storageInit() error// AddRequest adds a serialized request to the queueAddRequest([]byte) error// GetRequest pops the next request from the queue// or returns error if the queue is emptyGetRequest() ([]byte, error)// QueueSize returns with the size of the queueQueueSize() (int, error)
}
相关文章:

无限路由 DI-624+A 详细介绍
无线路由器硬件安装设置图解1、确认宽带线路正常:无线宽带路由器可以让您将家中的计算机共享高速宽带网络连结至互联网;但在此之前,您必须先具备一部基于以太网络的Cable/DSL Modem(使用RJ-45 接头),并确定您的宽带网络在只有连接…
教你如何编写第一个爬虫
2019年不管是编程语言排行榜还是在互联网行业,Python一直备受争议,到底是Java热门还是Python热门也是一直让人争吵的话题。随着信息时代的迭代更新,人工智能的兴起,Python编程语言也随之被人们广泛学习,Python数据分析…

【BZOJ】3542: DZY Loves March
题意 \(m * m\)的网格,有\(n\)个点。\(t\)个询问:操作一:第\(x\)个点向四个方向移动了\(d\)个单位。操作二:询问同行同列其他点到这个点的曼哈顿距离和。强制在线。(\(n \le 10^5,m \le 10^{18}\)ÿ…

Gin源码解析和例子——路由
Gin是一个基于golang的net包实现的网络框架。从github上,我们可以看到它相对于其他框架而言,具有优越的性能。本系列将从应用的角度来解析其源码。(转载请指明出于breaksoftware的csdn博客) 本文我们将分析其路由的原理。先看个例…
一文讲透推荐系统提供web服务的2种方式
作者丨gongyouliu编辑丨zandy来源 | 大数据与人工智能(ID: ai-big-data)推荐系统是一种信息过滤技术,通过从用户行为中挖掘用户兴趣偏好,为用户提供个性化的信息,减少用户的找寻时间,降低用户的决策成本&am…

jQuery遍历json数组怎么整。。。
{"options":"[{\"text\":\"王家湾\",\"value\":\"9\"},{\"text\":\"李家湾\",\"value\":\"10\"},{\"text\":\"邵家湾\",\"value\":\"13\…

述说C#中的值类型和引用类型的千丝万缕
关于值类型和引用类型方面的博客和文章可以说是汗牛充栋了,今天无意中又复读了一下这方面的知识,感觉还是有许多新感悟的,就此时间分享一下: CLR支持两种类型:值类型和引用类型,看起来FCL的大多数类型是引用…

Gin源码解析和例子——中间件(middleware)
在《Gin源码解析和例子——路由》一文中,我们已经初识中间件。本文将继续探讨这个技术。(转载请指明出于breaksoftware的csdn博客) Gin的中间件,本质是一个匿名回调函数。这和绑定到一个路径下的处理函数本质是一样的。 再以Engin…

DNS简单配置
DNS的原理就不说了,这里只是做个简单的配置,也是方便自己记忆,在这里还要十分感谢redking老大的教程!要安装的bind* 、caching-nameserver 包1、/var/named/chroot/etc/named.conf这个文件需要自己创建options { listen-on…
关系抽取论文整理,核方法、远程监督的重点都在这里
来源 | CSDN 博客作者 | Matt_sh,编辑 | Carol来源 | CSDN云计算(ID:CSDNcloud)本文是个人阅读文章的笔记整理,没有涉及到深度学习在关系抽取中的应用。笔记中一部分来自个人解读,一部分来自原文࿰…

freemarker内建函数介绍
Sequence的内置函数1.sequence?first 返回sequence的第一个值。2.sequence?last 返回sequence的最后一个值。3.sequence?reverse 将sequence的现有顺序反转,即倒序排序4.sequence?size 返回sequence的大小5.sequence?sort 将sequence中的对象转化为字符串后顺序…

PowerBuilder 11.x 的重要进步和不足
PowerBuilder 11(以下简称PB)出来有一段时间了,但很多用户对PB11的到底有哪些进步还不是很清楚,由于对PB11缺乏了解和信心,目前用PB11做出像样应用的用户不多,这确实非常遗憾,这里我讲一下我对P…
超赞的PyTorch资源大列表,GitHub标星9k+,中文版也上线了
点击阅读原文,快速报名!作者 | 红色石头来源 | AI有道(ID: redstonewill)自 2017 年 1 月 PyTorch 推出以来,其热度持续上升。PyTorch 能在短时间内被众多研究人员和工程师接受并推崇是因为其有着诸多优点,…

C++拾取——Linux下实测布隆过滤器(Bloom filter)和unordered_multiset查询效率
布隆过滤器是一种判定元素是否存在于集合中的方法。其基本原理是使用哈希方法将数据映射到一个很长的向量上。在维基百科上,它被称为“空间效率和查询时间都远远超过一般的算法”的方法。由于它只保存散列的数据,所以对于很长的数据有着良好的压缩特性&a…

递归思想解决输出目录下的全部文件
刚刚了解了下递归思想 递归就是在方法内调用本方法 下面说一个实际的应用 输出目录下的全部文件,当目录中还有目录时,则进入目录输出里面的文件 import java.io.*; class ShowFile{public static void showfile(File files){if(files.isDirectory()){Fi…

实战之网马解密之shellcode篇
今天上卡卡社区发现里面发了个网马解密的链接,呵呵 顺便试试看能解出来不.呵呵. 相信各位已经对网马有点了解了吧.一般网马都是加密了的.关于什么是网马以及怎么防止网马也不是本文的重点.本文是实战shellcode网马解密.以后的博文会放出常见的网马及其解密.以及常见的解密工具的…
机器学习中的线性回归,你理解多少?
作者丨algorithmia编译 | 武明利,责编丨Carol来源 | 大数据与人工智能(ID: ai-big-data)机器学习中的线性回归是一种来源于经典统计学的有监督学习技术。然而,随着机器学习和深度学习的迅速兴起,因为线性(多…

Golang反射机制的实现分析——reflect.Type类型名称
现在越来越多的java、php或者python程序员转向了Golang。其中一个比较重要的原因是,它和C/C一样,可以编译成机器码运行,这保证了执行的效率。在上述解释型语言中,它们都支持了“反射”机制,让程序员可以很方便的构建一…

设计模式----组合模式UML和实现代码
2019独角兽企业重金招聘Python工程师标准>>> 一、什么是组合模式? 组合模式(Composite)定义:将对象组合成树形结构以表示‘部分---整体’的层次结构。组合模式使得用户对单个对象和组合对象的使用具有一致性. 类型:结构型模式 顺口…

Golang反射机制的实现分析——reflect.Type方法查找和调用
在《Golang反射机制的实现分析——reflect.Type类型名称》一文中,我们分析了Golang获取类型基本信息的流程。本文将基于上述知识和经验,分析方法的查找和调用。(转载请指明出于breaksoftware的csdn博客) 方法 package mainimpor…
太狠!33岁年薪50万:“复工第一天,谢谢裁掉我!” 网友:有底气!
最近脉脉一则帖子炸锅了:某HR发帖称公司以按时下班为由裁员。这种情况下很多人都慌了,大家纷纷把“副业救国”奉为神律。可是你有没有认真的想过,为什么现在大家都需要副业:意外裁员后,房贷能够按时还上不至于“回收”…

SEO内部链接优化的技巧
内部链接是搜索引擎优化中的重要因素之一。思亿欧做的SEO调查发现,国内大部分网站都没有怎么做内部链接优化。这可能是网站管理员并不知晓SEO或者是对内部链接优化不够重视。 内部链接的设计不能是单纯的为了SEO的目的而作内部链接,同时要注意规划一个良…

Ubuntu 15.10安装ns2.35+nam
2019独角兽企业重金招聘Python工程师标准>>> Step1: 更新系统sudo apt-get update #更新源列表sudo apt-get upgrade #更新已经安装的包sudo apt-get dist-upgrade #更新软件,升级系统Step2:安装ns2需要的几个包sudo apt-get install build-essentialsu…

bug诞生记——不定长参数隐藏的类型问题
这个bug的诞生源于项目中使用了一个开源C库。由于对该C库API不熟悉,一个不起眼的错误调用,导致一系列诡异的问题。最终经过调试,我们发现发生了内存覆盖问题。为了直达问题根节,我将问题代码简化如下(转载请指明出于br…

yahoo註冊.com 域名1.99$/年
yahoo註冊.com 域名1.99$/年趕快去註冊吧http://order.sbs.yahoo.com/ds/reviewplanoption?.pYD1&mdom&.srcsbs&.promoBESTDEAL&dzzhen an.com支持paypal付款一個yahoo帳戶只能註冊一個如果覺得續費比較貴,可在註冊兩個月後轉出到godaddy.转载于:h…
Excel弱爆了!这个工具30分钟完成了我一天的工作量,零基础、文科生也能学!...
在大数据浪潮当中,数据分析是这个时代的不二“掘金技能”。我们每一个人,每天无时无刻都在生产数据,一分钟内,微博上新发的数据量超过10万,b站的视频播放量超过600万......这些庞大的数字,意味着什么&#…

Myeclipse快捷键的使用
存盘 Ctrls(肯定知道) 注释代码 Ctrl/ 取消注释 Ctrl\(Eclipse3已经都合并到Ctrl/了) 代码辅助 Alt/ 快速修复 Ctrl1 代码格式化 CtrlShiftf 整理导入 CtrlShifto 切换窗口 Ctrlf6 <可改为ctrltab方便> ctrlshiftM 导入未引用的包 ctrlw 关闭单个窗口 F3 跳转到类、变量的…

PL/SQL三种集合类型的比较
PL/SQL三种集合类型的比较<?xml:namespace prefix o ns "urn:schemas-microsoft-com:office:office" />集合是指在一个程序变量中包含多个值。PL/SQL提供的集合类型如下:Associative Array:TYPE t IS TABLE OF something INDEX BY PLS_INTEGER;N…
夺得WSDM Cup 2020大赛金牌的这份参赛方案,速来get!
近日,在美国休斯敦闭幕的第13届网络搜索与数据挖掘国际会议(WSDM 2020)上,华为云语音语义创新Lab带领的联合团队,摘得WSDM Cup 2020大赛“论文引用意图识别任务”金牌(Gold Medal)。WSDM被誉为全…

bug诞生记——信号(signal)处理导致死锁
这个bug源于项目中一个诡异的现象:代码层面没有明显的锁的问题,但是执行时发生了死锁一样的表现。我把业务逻辑简化为:父进程一直维持一个子进程。(转载请指明出于breaksoftware的csdn博客) 首先我们定义一个结构体Pro…