当前位置: 首页 > 编程日记 > 正文

Java多线程编程实战:模拟大量数据同步

背景

最近对于 Java 多线程做了一段时间的学习,笔者一直认为,学习东西就是要应用到实际的业务需求中的。否则要么无法深入理解,要么硬生生地套用技术只是达到炫技的效果。

不过笔者仍旧认为自己对于多线程掌握不够熟练,不敢轻易应用到生产代码中。这就按照平时工作中遇到的实际问题,脑补了一个很可能存在的业务场景:

已知某公司管理着 1000 个微信服务号,每个服务号有 1w ~ 50w 粉丝不等。假设该公司每天都需要将所有微信服务号的粉丝数据通过调用微信 API 的方式更新到本地数据库。

需求分析

对此需求进行分析,主要存在以下问题:

  • 单个服务号获取粉丝 id,只能每次 1w 按顺序拉取
  • 微信的 API 对于服务商的并发请求数量有限制

单个服务号获取粉丝 id,只能每次 1w 按顺序拉取。这个问题决定了单个公众号在拉取粉丝 id 上,无法分配给多个线程执行。

微信的 API 对于服务商的并发请求数量有限制。这点最容易被忽略,如果我们同时有过多的请求,则会导致接口被封禁。这里可以通过信号量来控制同时执行的线程数量。

为了尽快完成数据同步,根据实际情况:整个数据同步可分为读数据和写数据两个部分。读数据是通过 API 获取,走网络 IO,速度较慢;写数据是写到数据库,速度较快。所以得出结论:需要分配较多的线程进行读数据,较少的线程进行写数据。

设计要点

首先,我们需要确定开启多少个线程(在生产中往往是使用线程池),线程数量需要根据服务器性能来决定,这里我们定为 40 个读取数据线程(将 1000 个公众号分为 40 份,分别在 40 个线程中执行),1个写入数据线程。(具体开多少个线程,取决于线程池的容量,以及可以分配给此业务的数量。具体的数字需要根据实际情况测试得出,比服务器阈值低一些较好。当然,配置允许范围内越大越好)

其次,考虑到微信对于 API 并发请求的限制,需要限制同时执行的线程数,使用java.util.concurrent.Semaphore进行控制,这里我们限制为 20 个(具体的信号量凭证数,取决于同一时间能够执行的线程,跟 API 限制,服务器性能有关)。

然后,我们需要知道数据何时读取、写入完毕,以控制程序逻辑以及终止程序,这里我们使用java.util.concurrent.CountDownLatch进行控制。

最后,我们需要一个数据结构,用来在多个线程中共享处理的数据,此处同步数据的场景非常适合使用队列,这里我们使用线程安全的java.util.concurrent.ConcurrentLinkedQueue来进行处理。(需要注意的是,在实际开发中,队列不能够无限制地增长,这将会很快消耗掉内存,我们需要根据实际情况对队列长度做控制。例如,可以通过控制读取线程数和写入线程数的比例来控制队列的长度)

模拟代码

由于本文重点关注多线程的使用,模拟代码只体现多线程操作的方法。代码里添加了大量的注释,方便各位读者阅读理解。

JDK:1.8

import java.util.Arrays;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;/*** N个线程向队列添加数据* 一个线程消费队列数据*/
public class QueueTest {private static List<String> data = Arrays.asList("a", "b", "c", "d", "e");private static final int OFFER_COUNT = 40; // 开启的线程数量private static Semaphore semaphore = new Semaphore(20); // 同一时间执行的线程数量(大多用于控制API调用次数或数据库查询连接数)public static void main(String[] args) throws InterruptedException {Queue<String> queue = new ConcurrentLinkedQueue<>(); // 处理队列,需要处理的数据,放置到此队列中CountDownLatch offerLatch = new CountDownLatch(OFFER_COUNT); // offer线程latch,每完成一个,latch减一,lacth的count为0时表示offer处理完毕CountDownLatch pollLatch = new CountDownLatch(1); // poll线程latch,latch的count为0时,表示poll处理完毕Runnable offerRunnable = () -> {try {semaphore.acquire(); // 信号量控制} catch (InterruptedException e) {e.printStackTrace();}try {for (String datum : data) {queue.offer(datum);TimeUnit.SECONDS.sleep(2); // 模拟取数据很慢的情况}} catch (InterruptedException e) {e.printStackTrace();} finally {// 在finally中执行latch.countDown()以及信号量释放,避免因异常导致没有正常释放offerLatch.countDown();semaphore.release();}};Runnable pollRunnable = () -> {int count = 0;try {while (offerLatch.getCount() > 0 || queue.size() > 0) { // 只要offer的latch未执行完,或queue仍旧有数据,则继续循环String poll = queue.poll();if (poll != null) {System.out.println(poll);count++;}// 无论是否poll到数据,均暂停一小段时间,可降低CPU消耗TimeUnit.MILLISECONDS.sleep(100);}System.out.println("total count:" + count);} catch (InterruptedException e) {e.printStackTrace();} finally {// 在finally中执行latch.countDown(),避免因异常导致没有正常释放pollLatch.countDown();}};// 启动线程(生产环境中建议使用线程池)new Thread(pollRunnable).start(); // 启动一个poll线程for (int i = 0; i < OFFER_COUNT; i++) {new Thread(offerRunnable).start();} // 模拟取数据很慢,需要开启40个线程处理// latch等待,会block主线程直到latch的count为0offerLatch.await();pollLatch.await();System.out.println("===the end===");}
}

到这里,本文结束。以上是笔者脑补的一个常见需求的解决方案。

注意:多线程编程对实际环境和需求有很大的依赖,需要根据实际的需求情况对各个参数做调整。实际在使用中,需要尽量模拟生产环境的数据情况来进行测试,对服务器执行期间的并发数,CPU、内存、网络 IO、磁盘 IO 做好观察。并适当地调低并发数,以给服务器留有处理其他请求的余量。

相关文章:

Ubuntu中Atom编辑器显示中文乱码的处理方法

在Ubuntu14.04 64位机上安装Atom&#xff0c;依次在终端输入如下命令&#xff1a; 1. $ sudo add-apt-repository ppa:webupd8team/atom 2. $ sudo apt-get update 3. $ sudo apt-get install atom处理中文乱码的问题&#xff1a; 1. 安装文泉驿正黑等相关中文字体&#…

我的世界游戏安装

2019独角兽企业重金招聘Python工程师标准>>> Minecraft 是一款沙盘独立视频游戏&#xff0c;灵感来自于Infiniminer&#xff0c;使用Java编写&#xff0c;由Markus "Notch" Persson 所建立&#xff0c;现由Mojang AB 公司开发。 这里我们介绍如何在pcDuin…

RSA签名的PSS模式

本文由云社区发表作者&#xff1a;mariolu 一、什么是PSS模式&#xff1f; 1.1、两种签名方式之一RSA-PSS PSS (Probabilistic Signature Scheme)私钥签名流程的一种填充模式。目前主流的RSA签名包括RSA-PSS和RSA-PKCS#1 v1.5。相对应PKCS&#xff08;Public Key Cryptography …

AI真人表情包、斗地主AI......DeeCamp学员做了50个好玩又实用的AI项目

8月16日&#xff0c;2019 DeeCamp人工智能训练营的结营仪式上&#xff0c;展示了600名DeeCamp学员的50个AI实践课题。 结营仪式上&#xff0c;由2019 DeeCamp学员组成的6个项目小组作为代表&#xff0c;现场展示了自己Demo成果&#xff0c;并由李开复、张潼等学术及产业导师现…

libcurl库的使用(通过libcurl库下载url图像)

1. 从http://curl.haxx.se/download.html下载libcurl源码&#xff0c;解压缩&#xff1b; 2. 通过CMake(cmake-gui)生成vs2013 x64位 CURL.sln&#xff1b; 3. 打开CURL.sln&#xff0c;编译会生成libcurl.dll动态库&#xff1b; 4. 在CURL.sln基础上&a…

SQL Server 2005/2008 用户数据库文件默认路径和默认备份路径修改方法

2019独角兽企业重金招聘Python工程师标准>>> 一直想把数据库的默认路径修改一下&#xff0c;在网上找了一下&#xff0c;真的发现有办法 &#xff0c; 特拿 来与大家共同分享。 以下仅为参照&#xff0c;如果有多个实例&#xff0c;可能会有些许不同&#xff1a; …

Linux下多线程编程互斥锁和条件变量的简单使用

Linux下的多线程遵循POSIX线程接口&#xff0c;称为pthread。编写Linux下的多线程程序&#xff0c;需要使用头文件pthread.h&#xff0c;链接时需要使用库libpthread.a。线程是进程的一个实体&#xff0c;是CPU调度和分派的基本单位&#xff0c;它是比进程更小的能独立运行的基…

03基于python玩转人工智能最火框架之TensorFlow介绍

一句话介绍: Google开源的基于数据流图的科学计算库&#xff0c;适用于机器学习 不局限于机器学习&#xff0c;但目前被大多用于机器学习等。 TensorFlow计算流图的概念图 Tensor在图中流动。 TensorFlow的含义 拆字释义: Tensor 张量(tf中数据的表征) flow 流动 张量在图中流…

赴约北大,2019 CCF大数据与计算智能大赛正式启动

8月17日&#xff0c;以“数据驱动&#xff0c;智创未来”为主题的2019 CCF大数据与计算智能大赛&#xff08;2019 CCF BDCI&#xff09;全球启动仪式&#xff0c;在北京大学英杰交流中心阳光厅正式启幕。自2013年创办以来&#xff0c;大赛已成功举办六届&#xff0c;连续获得教…

Hadoop入门(10)_通过java代码实现从本地的文件上传到Hadoop的文件系统

2019独角兽企业重金招聘Python工程师标准>>> 第一步&#xff1a;首先搭建java的编译环境。创建一个Java Project工程&#xff0c;名为upload。 第二步&#xff1a;选中所需的Jar包。 选中JRE System Library 选择BuildPath Configure Build Path 选择ha…

Caffe源码中各种依赖库的作用及简单使用

1. Boost库&#xff1a;它是一个可移植、跨平台&#xff0c;提供源代码的C库&#xff0c;作为标准库的后备。 在Caffe中用到的Boost头文件包括&#xff1a; (1)、shared_ptr.hpp&#xff1a;智能指针&#xff0c;使用它可以不需要考虑内存释放的问题&#xff1b; (2)、dat…

漫画:5分钟了解什么是动态规划?

作者 | 调皮的阿广来源 | 视学算法&#xff08;ID&#xff1a;z872561826&#xff09;动态规划&#xff0c;英文是Dynamic Programming&#xff0c;简称DP&#xff0c;擅长解决“多阶段决策问题”&#xff0c;利用各个阶段阶段的递推关系&#xff0c;逐个确定每个阶段的最优决策…

小程序大转盘红包雨营销组件

前言 商城没几个营销活动能叫商城吗&#xff1f;所以就来几个组件吧&#xff0c;写的不好轻踩&#xff0c;对你有帮助记得给个小星星哦直接上链接github链接 运行例子 git clone https://github.com/sunnie1992/soul-weapp.git 微信开发者工具打开项目 营销组件 大转盘 "p…

Windows Server 2012 RDS系列:虚拟桌面化(5)

概述&#xff1a;本次将系列地测试Windows Server 2012 远程桌面服务&#xff08;RDS&#xff09;&#xff0c;将过程进行分享&#xff0c;总的感觉比2008 R2更简单了&#xff0c;体现着2012的自动化。2012的RDS部署有标准部署和快速启动两种&#xff0c;快速启动就是自动快速配…

里程碑式成果Faster RCNN复现难?我们试了一下 | 附完整代码

作者 | 已退逼乎 来源 | 知乎【导读】2019年以来&#xff0c;除各AI 大厂私有网络范围外&#xff0c;MaskRCNN&#xff0c;CascadeRCNN 成为了支撑很多业务得以开展的基础&#xff0c;而以 Faster RCNN 为基础去复现其他的检测网络既省时又省力&#xff0c;也算得上是里程碑性成…

【跃迁之路】【725天】程序员高效学习方法论探索系列(实验阶段482-2019.2.15)...

实验说明 从2017.10.6起&#xff0c;开启这个系列&#xff0c;目标只有一个&#xff1a;探索新的学习方法&#xff0c;实现跃迁式成长实验期2年&#xff08;2017.10.06 - 2019.10.06&#xff09;我将以自己为实验对象。我将开源我的学习方法&#xff0c;方法不断更新迭代&#…

C/C++各种数据类型转换汇总

以下是Windows/Linux系统中常用的C/C各种数据类型转换汇总&#xff1a;#ifndef FBC_MESSY_TEST_DATA_TYPE_CONVERT_HPP_ #define FBC_MESSY_TEST_DATA_TYPE_CONVERT_HPP_#include <stdio.h> #include <stdlib.h> #include <iostream> #include <string>…

ASP.NET技巧:两个截取字符串的实用方法

两个截取字符串的实用方法&#xff08;超过一定长度自动换行&#xff09;1/** <summary> 2 /// 截取字符串&#xff0c;不限制字符串长度 3 /// </summary> 4 /// <param name"str">待截取的字符串</param> 5 /…

吃瓜腾讯平均月薪7.27万后,微信又出大招

腾讯最新财报一出&#xff0c;喜提热搜&#xff01;据腾讯第二季度财报显示&#xff1a;2019 年上半年腾讯有员工56310人&#xff0c;总薪酬成本为242.59亿元&#xff0c;腾讯员工平均半年薪为43.08万元。在第一季度里&#xff0c;腾讯员工平均季度薪资为21.27万元&#xff0c;…

回调函数在C/C++中的使用

回调函数就是一个通过函数指针调用的函数。假如把A函数的指针当作参数传给B函数,然后在B函数中通过A函数传进来的这个指针调用A函数&#xff0c;那么就是回调机制。A函数就是回调函数&#xff0c;而通常情况下&#xff0c;A函数是在系统符合你设定的条件下自动执行。使用回调函…

excel单元格加引号及逗号,转换为sql需要的样式

A1 B1BXQY001 ------> BXQY001,BXQY001 -----> BXQY001 在B1中输入公式&#xff1a; ""&A1&""&"," 在B2中输入公式&#xff1a; ""&A1&"" 去掉了后面的逗号。其实就是 " "&A1&…

Win7/Win8 系统下安装Oracle 10g 提示“程序异常终止,发生未知错误”的解决方法...

我的Oracle 10g版本是10.2.0.1.0&#xff0c;&#xff08;10.1同理&#xff09;选择高级安装&#xff0c;提示“程序异常终止&#xff0c;发生未知错误”。1.修改Oracle 10G\database\stage\prereq\db\refhost.xml当打开refhost.xml 后会发现有</SYSTEM> <CERTIFIED…

Caffe基础介绍

Caffe的全称应该是Convolutional Architecture for Fast Feature Embedding&#xff0c;它是一个清晰、高效的深度学习框架&#xff0c;它是开源的&#xff0c;核心语言是C&#xff0c;它支持命令行、Python和Matlab接口&#xff0c;它既可以在CPU上运行也可以在GPU上运行。它的…

飞桨博士会第三期来啦!中国深度学习技术俱乐部诚邀您加入

飞桨博士会是由百度开源深度学习平台飞桨&#xff08;PaddlePaddle&#xff09;发起的中国深度学习技术俱乐部&#xff0c;旨在打造深度学习核心开发者交流圈&#xff0c;助力会员拓展行业高端人脉、交流前沿技术。俱乐部为会员制&#xff0c;成员皆为博士生导师或博士&#xf…

canvas 拼图

效果 代码 <!DOCTYPE html> <html lang"zh_CN"> <head><meta charset"UTF-8"><title>拼图</title><script src"https://code.jquery.com/jquery-3.3.1.js"></script> </head> <body&g…

性能优化之Java(Android)代码优化

最新最准确内容建议直接访问原文&#xff1a;性能优化之Java(Android)代码优化 本文为Android性能优化的第三篇——Java(Android)代码优化。主要介绍Java代码中性能优化方式及网络优化&#xff0c;包括缓存、异步、延迟、数据存储、算法、JNI、逻辑等优化方式。(时间仓促&#…

1小时上手MaskRCNN·Keras开源实战 | 深度应用

作者 | 小宋是呢来源 | CSDN博客0. 前言介绍开源地址&#xff1a;https://github.com/matterport/Mask_RCNN个人主页&#xff1a;http://www.yansongsong.cn/MaskRCNN 是何恺明基于以往的 faster rcnn 架构提出的新的卷积网络&#xff0c;一举完成了 object instance segmentat…

MNIST数据库介绍及转换

MNIST数据库介绍&#xff1a;MNIST是一个手写数字数据库&#xff0c;它有60000个训练样本集和10000个测试样本集。它是NIST数据库的一个子集。MNIST数据库官方网址为&#xff1a;http://yann.lecun.com/exdb/mnist/ &#xff0c;也可以在windows下直接下载&#xff0c;train-im…

PostgreSQL学习笔记(1)

安装psql brew install postgresql 启动服务 brew services start postgresql 使用psql进入控制台&#xff0c;报错&#xff1a; psql: FATAL: database "<user>" does not exist 看来是没有给我的当前用户创建数据库&#xff0c;使用下面命令进入名为templat…

怎样使一个Android应用不被杀死?(整理)

2019独角兽企业重金招聘Python工程师标准>>> 方法 &#xff1a; 对于一个service&#xff0c;可以首先把它设为在前台运行&#xff1a; public void MyService.onCreate() { super.onCreate(); Notification notification new Notification(android.R.drawable.my_…