当前位置: 首页 > 编程日记 > 正文

Python并发编程实例教程

有关Python中的并发编程实例,主要是对Threading模块的应用,文中自定义了一个Threading类库。

一、简介
  我们将一个正在运行的程序称为进程。每个进程都有它自己的系统状态,包含内存状态、打开文件列表、追踪指令执行情况的程序指针以及一个保存局部变量的调用栈。通常情况下,一个进程依照一个单序列控制流顺序执行,这个控制流被称为该进程的主线程。在任何给定的时刻,一个程序只做一件事情。
  一个程序可以通过Python库函数中的os或subprocess模块创建新进程(例如os.fork()或是subprocess.Popen())。然而,这些被称为子进程的进程却是独立运行的,它们有各自独立的系统状态以及主线程。因为进程之间是相互独立的,因此它们同原有的进程并发执行。这是指原进程可以在创建子进程后去执行其它工作。
  虽然进程之间是相互独立的,但是它们能够通过名为进程间通信(IPC)的机制进行相互通信。一个典型的模式是基于消息传递,可以将其简单地理解为一个纯字节的缓冲区,而send()或recv()操作原语可以通过诸如管道(pipe)或是网络套接字(network socket)等I/O通道传输或接收消息。还有一些IPC模式可以通过内存映射(memory-mapped)机制完成(例如mmap模块),通过内存映射,进程可以在内存中创建共享区域,而对这些区域的修改对所有的进程可见。
  多进程能够被用于需要同时执行多个任务的场景,由不同的进程负责任务的不同部分。然而,另一种将工作细分到任务的方法是使用线程。同进程类似,线程也有其自己的控制流以及执行栈,但线程在创建它的进程之内运行,分享其父进程的所有数据和系统资源。当应用需要完成并发任务的时候线程是很有用的,但是潜在的问题是任务间必须分享大量的系统状态。
  当使用多进程或多线程时,操作系统负责调度。这是通过给每个进程(或线程)一个很小的时间片并且在所有活动任务之间快速循环切换来实现的,这个过程将CPU时间分割为小片段分给各个任务。例如,如果你的系统中有10个活跃的进程正在执行,操作系统将会适当的将十分之一的CPU时间分配给每个进程并且循环地在十个进程之间切换。当系统不止有一个CPU核时,操作系统能够将进程调度到不同的CPU核上,保持系统负载平均以实现并行执行。
  利用并发执行机制写的程序需要考虑一些复杂的问题。复杂性的主要来源是关于同步和共享数据的问题。通常情况下,多个任务同时试图更新同一个数据结构会造成脏数据和程序状态不一致的问题(正式的说法是资源竞争的问题)。为了解决这个问题,需要使用互斥锁或是其他相似的同步原语来标识并保护程序中的关键部分。举个例子,如果多个不同的线程正在试图同时向同一个文件写入数据,那么你需要一个互斥锁使这些写操作依次执行,当一个线程在写入时,其他线程必须等待直到当前线程释放这个资源。
Python中的并发编程
  Python长久以来一直支持不同方式的并发编程,包括线程、子进程以及其他利用生成器(generator function)的并发实现。
  Python在大部分系统上同时支持消息传递和基于线程的并发编程机制。虽然大部分程序员对线程接口更为熟悉,但是Python的线程机制却有着诸多的限制。Python使用了内部全局解释器锁(GIL)来保证线程安全,GIL同时只允许一个线程执行。这使得Python程序就算在多核系统上也只能在单个处理器上运行。Python界关于GIL的争论尽管很多,但在可预见的未来却没有将其移除的可能。
  Python提供了一些很精巧的工具用于管理基于线程和进程的并发操作。即使是简单地程序也能够使用这些工具使得任务并发进行从而加快运行速度。subprocess模块为子进程的创建和通信提供了API。这特别适合运行与文本相关的程序,因为这些API支持通过新进程的标准输入输出通道传送数据。signal模块将UNIX系统的信号量机制暴露给用户,用以在进程之间传递事件信息。信号是异步处理的,通常有信号到来时会中断程序当前的工作。信号机制能够实现粗粒度的消息传递系统,但是有其他更可靠的进程内通讯技术能够传递更复杂的消息。threading模块为并发操作提供了一系列高级的,面向对象的API。Thread对象们在一个进程内并发地运行,分享内存资源。(www.jbxue.com)使用线程能够更好地扩展I/O密集型的任务。multiprocessing模块同threading模块类似,不过它提供了对于进程的操作。每个进程类是真实的操作系统进程,并且没有共享内存资源,但multiprocessing模块提供了进程间共享数据以及传递消息的机制。通常情况下,将基于线程的程序改为基于进程的很简单,只需要修改一些import声明即可。
Threading模块示例
  以threading模块为例,思考这样一个简单的问题:如何使用分段并行的方式完成一个大数的累加。
import threading

class SummingThread(threading.Thread):
def __init__(self, low, high):
super(SummingThread, self).__init__()
self.low = low
self.high = high
self.total = 0

def run(self):
for i in range(self.low, self.high):
self.total += i

thread1 = SummingThread(0, 500000)
thread2 = SummingThread(500000, 1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join() # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print(result)

自定义Threading类库
  我写了一个易于使用threads的小型Python类库,包含了一些有用的类和函数。
关键参数:
  * do_threaded_work – 该函数将一系列给定的任务分配给对应的处理函数(分配顺序不确定)
  * ThreadedWorker – 该类创建一个线程,它将从一个同步的工作队列中拉取工作任务并将处理结果写入同步结果队列
  * start_logging_with_thread_info – 将线程id写入所有日志消息。(依赖日志环境)
  * stop_logging_with_thread_info – 用于将线程id从所有的日志消息中移除。(依赖日志环境)
import threading
import logging

def do_threaded_work(work_items, work_func, num_threads=None, per_sync_timeout=1, preserve_result_ordering=True):
""" Executes work_func on each work_item. Note: Execution order is not preserved, but output ordering is (optionally).

Parameters:
- num_threads Default: len(work_items) --- Number of threads to use process items in work_items.
- per_sync_timeout Default: 1 --- Each synchronized operation can optionally timeout.
- preserve_result_ordering Default: True --- Reorders result_item to match original work_items ordering.

Return:
--- list of results from applying work_func to each work_item. Order is optionally preserved.

Example:

def process_url(url):
# TODO: Do some work with the url
return url

urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]

# process urls in parallel
result_items = do_threaded_work(urls_to_process, process_url)

# print(results)
print(repr(result_items))
"""
global wrapped_work_func
if not num_threads:
num_threads = len(work_items)

work_queue = Queue.Queue()
result_queue = Queue.Queue()

index = 0
for work_item in work_items:
if preserve_result_ordering:
work_queue.put((index, work_item))
else:
work_queue.put(work_item)
index += 1

if preserve_result_ordering:
wrapped_work_func = lambda work_item: (work_item[0], work_func(work_item[1]))

start_logging_with_thread_info()

#spawn a pool of threads, and pass them queue instance
for _ in range(num_threads):
if preserve_result_ordering:
t = ThreadedWorker(work_queue, result_queue, work_func=wrapped_work_func, queue_timeout=per_sync_timeout)
else:
t = ThreadedWorker(work_queue, result_queue, work_func=work_func, queue_timeout=per_sync_timeout)
t.setDaemon(True)
t.start()

work_queue.join()
stop_logging_with_thread_info()

logging.info('work_queue joined')

result_items = []
while not result_queue.empty():
result = result_queue.get(timeout=per_sync_timeout)
logging.info('found result[:500]: ' + repr(result)[:500])
if result:
result_items.append(result)

if preserve_result_ordering:
result_items = [work_item for index, work_item in result_items]

return result_items

class ThreadedWorker(threading.Thread):
""" Generic Threaded Worker
Input to work_func: item from work_queue

Example usage:

import Queue

urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]

work_queue = Queue.Queue()
result_queue = Queue.Queue()

def process_url(url):
# TODO: Do some work with the url
return url

def main():
# spawn a pool of threads, and pass them queue instance
for i in range(3):
t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
t.setDaemon(True)
t.start()

# populate queue with data
for url in urls_to_process:
work_queue.put(url)

# wait on the queue until everything has been processed
work_queue.join()

# print results
print repr(result_queue)

main()
"""

def __init__(self, work_queue, result_queue, work_func, stop_when_work_queue_empty=True, queue_timeout=1):
threading.Thread.__init__(self)
self.work_queue = work_queue
self.result_queue = result_queue
self.work_func = work_func
self.stop_when_work_queue_empty = stop_when_work_queue_empty
self.queue_timeout = queue_timeout

def should_continue_running(self):
if self.stop_when_work_queue_empty:
return not self.work_queue.empty()
else:
return True

def run(self):
while self.should_continue_running():
try:
# grabs item from work_queue
work_item = self.work_queue.get(timeout=self.queue_timeout)

# works on item
work_result = self.work_func(work_item)

#place work_result into result_queue
self.result_queue.put(work_result, timeout=self.queue_timeout)

except Queue.Empty:
logging.warning('ThreadedWorker Queue was empty or Queue.get() timed out')

except Queue.Full:
logging.warning('ThreadedWorker Queue was full or Queue.put() timed out')

except:
logging.exception('Error in ThreadedWorker')

finally:
#signals to work_queue that item is done
self.work_queue.task_done()

def start_logging_with_thread_info():
try:
formatter = logging.Formatter('[thread %(thread)-3s] %(message)s')
logging.getLogger().handlers[0].setFormatter(formatter)
except:
logging.exception('Failed to start logging with thread info')

def stop_logging_with_thread_info():
try:
formatter = logging.Formatter('%(message)s')
logging.getLogger().handlers[0].setFormatter(formatter)
except:
logging.exception('Failed to stop logging with thread info')

使用示例
from test import ThreadedWorker
from queue import Queue

urls_to_process = ["http://facebook.com", "http://pypix.com"]

work_queue = Queue()
result_queue = Queue()

def process_url(url):
# TODO: Do some work with the url
return url

def main():
# spawn a pool of threads, and pass them queue instance
for i in range(5):
t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
t.setDaemon(True)
t.start()

# populate queue with data
for url in urls_to_process:
work_queue.put(url)

# wait on the queue until everything has been processed
work_queue.join()

# print results
print(repr(result_queue))

main()

相关文章:

Nginx负载均衡之TCP端口高可用(二)

在前面我们实现了基本的HTTP反向代理,从互联网过来的请求已经可以分发到后端多台网站服务器上,但不是所有的业务都是网络类型的,此篇文章我们主要讨论的是TCP 端口的负载均衡做法,昨天也有小伙伴提到了,在HTTP反向代理…

语音识别大牛Daniel Povey为何加入小米?“手机+AIoT”强大生态,开源战略是关键...

整理 | 夕颜出品 | AI科技大本营(ID:rgznai100)10 月 17 日,语音识别开源工具 Kaldi 创始人,语音和 AI 领域大牛 Daniel Povey 在10 月 19 日,小米集团副总裁、集团技术委员会主席崔宝秋发布微博,欢迎 Dani…

Python运维项目中用到的redis经验及数据类型

先感叹下,学东西一定要活学活用! 我用redis也有几年的历史了,今个才想到把集合可以当python list用。 最近做了几个项目都掺杂了redis, 遇到了一些个问题和开发中提高性能的方法,这都分享出来,共同学习。…

图像边缘检测之拉普拉斯(Laplacian)C++实现

拉普拉斯算子(Laplacian)可应用到图像边缘检测中。在OpenCV中当kernel大小为3*3时,支持两种kernel算子,分别为:在OpenCV中默认的计算方式如下,假设有一个5*5的小图像,原始值依次为1,2,…25,如下图红色部分,…

从0到1,Airbnb的深度学习实践经验总结

作者 | Haldar译者 | 陆离出品 | AI科技大本营(ID: rgznai100)此前,AI科技大本营发布了关于希望通过介绍的研究成果为读者提供一些有用的帮助和指引。模型中的生态系统本文要讨论的机器学习现实应用,是关于根据用户预约的可能性来…

高并发大流量专题---8、动态语言的并发处理

高并发大流量专题---8、动态语言的并发处理 一、总结 一句话总结: 和本科毕业论文连起来了:基于消息中间件Rocket MQ的研究;用于并发处理的消息队列 1、什么是进程、线程、协程? 进程(Process)是计算机中的…

1. 文件系统——磁盘分区、各目录功能、硬盘

一、磁盘分区及文件访问入口在前文中介绍过,Linux的整个文件系统像一棵倒置的数,最顶层的是根文件系统,其下有很多一级子目录,一级子目录下面是二级子目录,依此类推:/:根目录/bin,/s…

吴恩达老师深度学习视频课笔记:逻辑回归公式推导及C++实现

逻辑回归(Logistic Regression)是一个二分分类算法。逻辑回归的目标是最小化其预测与训练数据之间的误差。为了训练逻辑回归模型中的参数w和b,需要定义一个成本函数(cost function)。成本函数(cost function):它是针对整个训练集的。衡量参数w和b在整个训…

网络运行时间提高100倍,Google使用的AI视频理解架构有多强?

译者 | 刘畅出品 | AI科技大本营(ID:rgznai100)视频理解是一个很有挑战性的问题。由于视频包含时空数据,因此图像的特征表示需要同时提取图像和运动信息。这不仅对自动理解视频语义内容有重要性,还对机器人的感知和学习也至关重要…

iOS学习笔记(十三)——获取手机信息(UIDevice、NSBundle、NSLocale)

2019独角兽企业重金招聘Python工程师标准>>> iOS的APP的应用开发的过程中,有时为了bug跟踪或者获取用反馈的需要自动收集用户设备、系统信息、应用信息等等,这些信息方便开发者诊断问题,当然这些信息是用户的非隐私信息&#xff0…

吴恩达老师深度学习视频课笔记:单隐含层神经网络公式推导及C++实现(二分类)

关于逻辑回归的公式推导和实现可以参考: http://blog.csdn.net/fengbingchun/article/details/79346691 下面是在逻辑回归的基础上,对单隐含层的神经网络进行公式推导:选择激活函数时的一些经验:不同层的激活函数可以不一样。如果…

「2019中国大数据技术大会」超值学生票来啦!

大会官网:https://t.csdnimg.cn/U1wA经过11年的沉淀与发展,中国大数据技术大会见证了大数据技术生态在中国的建立、发展和成熟,已经成为国内大数据行业极具影响力的盛会,也是大数据人非常期待的年度深度分享盛会。在新的时代背景下…

校验正确获取对象或者数组的属性方法(babel-plugin-idx/_.get)

背景: 开发中经常遇到取值属性的时候,需要校验数值的有效性。 例如: 获取props对象里面的friends属性 props.user && props.user.friends && props.user.friends[0] && props.user.friends[0].friends 对于深层的对…

Ring Tone Manager on Windows Mobile

2019独角兽企业重金招聘Python工程师标准>>> 手机铃声经常能够体现一个人的个性,有些哥们儿在自习室不把手机设置成震动,一来电就#$^%^&^%#$&$*,声音还很大,唯恐别人听不到。 Windows Mobile设备上如何来设置手…

OpenCV3.3中K-Means聚类接口简介及使用

OpenCV3.3中给出了K-均值聚类(K-Means)的实现,即接口cv::kmeans,接口的声明在include/opencv2/core.hpp文件中,实现在modules/core/src/kmeans.cpp文件中,其中:下面对此接口中的参数作个简单说明:(1)、data&#xff1a…

一文读懂对抗机器学习Universal adversarial perturbations | CSDN博文精选

作者 | Icoding_F2014来源 | CSDN博客本文提出一种 universal 对抗扰动,universal 是指同一个扰动加入到不同的图片中,能够使图片被分类模型误分类,而不管图片到底是什么。示意图:形式化的定义:对于d维数据分布 μ&…

Reactor模式与Proactor模式

博主一脚刚踏进分布式的大门(看《分布式Java应用》,如果大家有啥推荐的书欢迎留言~),发现书中对NIO采用的Reactor模式、AIO采用的Proactor模式一笔带过,好奇心趋势我找了一下文章,发现两篇挺不错的文章&…

linux下使profile和.bash_profile立即生效的方法

使profile生效的方法1.source /etc/profile使用.bash_profile生效的方法1 . .bash_profile2 source .bash_profile3 exec bash --login转载于:https://blog.51cto.com/shine20/1436473

吴恩达老师深度学习视频课笔记:多隐含层神经网络公式推导(二分类)

多隐含层神经网络的推导步骤非常类似于单隐含层神经网络的步骤,只不过是多重复几遍。关于单隐含层神经网络公式的推导可以参考: http://blog.csdn.net/fengbingchun/article/details/79370310 逻辑回归是一个浅层模型(shadow model),或称单层…

Python中的元编程:一个关于修饰器和元类的简单教程

作者 | Saurabh Kukade译者 | 刘畅出品 | AI科技大本营(ID:rgznai100)最近,作者遇到一个非常有趣的概念,它就是用 Python 进行元编程。我想在本文中分享我对该主题的见解。作者希望它可以帮助解决这个问题,因为很多人说…

获取用户电脑的上网IP地址

在项目中经常要获取用户的上网的IP地址&#xff0c;如何获取用户的IP地址&#xff0c;方法很多&#xff0c;现在介绍以下2种。 /// <summary> /// 获取本机在局域网的IP地址 /// </summary> /// <returns></returns> …

数学图形(1.40)T_parameter

不记得在哪搞了个数学公式生成的图形. vertices 1000t from 0 to (2*PI) r 2.0 x r*(5*cos(t) - cos(6*t)) y r*(3*sin(t) - sin(4*t)) 给线加上一维变量的变化,使之变成面: vertices D1:360 D2:21u from 0 to (2*PI) D1 v from 0 to 20 D2x (v2)*cos(u) - cos((v3)*u…

K-均值聚类(K-Means) C++代码实现

K-均值聚类(K-Means)简介可以参考&#xff1a; http://blog.csdn.net/fengbingchun/article/details/79276668 以下是K-Means的C实现&#xff0c;code参考OpenCV 3.3中的cv::kmeans函数&#xff0c;均值点初始化的方法仅支持KMEANS_RANDOM_CENTERS。以下是从数据集MNIST中提取…

让学生网络相互学习,为什么深度相互学习优于传统蒸馏模型?| 论文精读

作者 | Ying Zhang&#xff0c;Tao Xiang等译者 | 李杰出品 | AI科技大本营&#xff08;ID:rgznai100&#xff09;蒸馏模型是一种将知识从教师网络&#xff08;teacher&#xff09;传递到学生网络&#xff08;student&#xff09;的有效且广泛使用的技术。通常来说&#xff0c;…

mac apache 配置

mac系统自带apache这无疑给广大的开发朋友提供了便利&#xff0c;接下来是针对其中的一些说明 一、自带apache相关命令 1. sudo apachectl start 启动服务&#xff0c;需要权限&#xff0c;就是你计算机的password 2. sudo apachectl stop 终止服务 ####3. sudo apachectl rest…

jQuery学习---------认识事件处理

3种事件模型&#xff1a;原始事件模型DOM事件模型IE事件模型原始事件模型&#xff08;0级事件模型&#xff09;1、事件处理程序被定义为函数实例&#xff0c;然后绑定到DOM元素事件对象上&#xff0c;实现事件的注册。例子&#xff1a;var btn document.getElementsByTagName(…

C++中的虚函数表介绍

在C语言中,当我们使用基类的引用或指针调用一个虚成员函数时会执行动态绑定。因为我们直到运行时才能知道到底调用了哪个版本的虚函数&#xff0c;所以所有虚函数都必须有定义。通常情况下&#xff0c;如果我们不使用某个函数&#xff0c;则无须为该函数提供定义。但是我们必须…

AI如何赋能金融行业?百度、图灵深视等同台分享技术实践

近日&#xff0c;由BTCMEX举办的金融技术创新研讨会在北京举办。BTCMEX投资人李笑来&#xff0c;AI技术公司TuringPass、百度、美国Apache基金会项目Pulsar、区块链安全公司SlowMist等相关专家参加了此次会议&#xff0c;共同探讨了金融技术在创新方面的现状。 图灵深视副总裁许…

【Win32 API学习]打开可执行文件

在MFC中打开其他可执行文件常用到的方法有&#xff1a;WinExec、ShellExecute、CreatProcess。 1.WinExec WinExec 主要运行EXE文件&#xff0c;用法简单&#xff0c;只有两个参数&#xff0c;前一个指定命令路径&#xff0c;后一个指定窗口显示方式&#xff1a; UINT WinExec(…

支付宝接口使用文档说明 支付宝异步通知

支付宝接口使用文档说明 支付宝异步通知(notify_url)与return_url. 现支付宝的通知有两类。 A服务器通知&#xff0c;对应的参数为notify_url&#xff0c;支付宝通知使用POST方式 B页面跳转通知&#xff0c;对应的参数为return_url&#xff0c;支付宝通知使用GET方式 &#xff…