Python下基于异步消息队列的分布式计算黑科技

研究生期间主要的方向是混沌密码学,其中很重要的一部分工作就是对混沌映射产生的数据进行分析,这些数据量是非常巨大的(不然怎么做密码?),于是使用了并行计算的黑科技来加速数据统计与分析。

老板每天都在催结果,但是电脑就这点性能还能咋办!当然,对于我来说还不是极限,只能进一步思考压榨潜能!

实验室配了一台老台式机加上自己的小本本,还有一个老台式机让我做了服务器。于是考虑要从分布式计算着手了,架构如下:

分布式计算结构

该计算网络中的计算机也通过多线程跑满了CPU,本次我使用了三台机器,相当于比普通的任务提速了12倍,对于这次原本可能需要48小时的任务,可以缩减到4小时!当然也可以手动控制线程数,上图中的服务器其实都构建在一台服务器主机内,因此需要留有一点余地。

至于效果好不好?可以说目前我这套系统的性能瓶颈在服务器数据库的并发能力。

Celery 框架

Python真是好东西,使用Celery框架就可以方便的实现分布式计算了!实现分布式计算,需要三个部分:设置任务、消息队列、发布任务,其中消息队列需要通过Redis、RabbitMQ或数据库服务来实现。

设置任务

直接上简单易懂的代码,在工作目录中创建Python脚本:distributed_tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from celery import Celery
from sqllchemy import create_engine
import pandas as pd
from time import sleep

# BROKER 就是消息队列,可以使用RabbitMQ或Redis
BROKER="amqp://user:password@rabbitmq_server:port//"
# BROKER="redis://redis_server:port/0"
db = create_engine("postgresql+psycopg2://user:password@host:port/database")
app = Celery('tasks', broker=BROKER)


@app.task
def some_job(x, y):
# 延迟10秒
sleep(10)
result = x + y

# 可以通过pandas把数据保存到数据库中,也可以直接写入数据库
data = pd.DataFrame(data={'result': result}, index={1})
data.to_sql("table_name", data, db, if_exists='append', index=False)

#  也是可以直接返回数据的,但是要返回到消息队列
return result

实现上面的代码之后,在命令行中键入命令:

1
celery worker -A distributed_tasks

其中,

  • worker 是celery的参数,用来根据消息队列的数据执行从上面脚本中的工作
  • -A distributed_tasks 就是把上面脚本中的信息通知celery,主要导入的是 app

如此,本机就处于工作机的状态了,如果脚本中定义的消息队列中有消息,他就会自动获取并根据参数执行任务,这个后面可以看到。

消息队列

最简单方式是使用Redis服务,具体怎么安装这些服务就不多说了,网上资料一大把。还有很多在线消息队列也是不错的选择,比如说Amazon的消息队列等等。

值得一提的是,Redis在消息量非常小的时候具有优势,但是我为了精分任务,消息量非常大,Redis就存在性能瓶颈了,因此用RabbitMQ比较合理。

发布任务

创建用于发布任务的Python脚本:publish_task.py

1
2
3
4
5
6
7
8
9
from disturbuted import some_job

# 直接发布10个任务
# 如果启动worker的时候加上了 '--loglevel info' 参数就可以直接看到结果了
[some_job.delay(each, 1) for each in range(10)]

# 要是看不懂上面的代码就直接看下面的,一样的:
some_job.delay(1, 1)
some_job.delay(1, 2)

单机运行该脚本就可以实现多线程并行计算了,要是有多余的电脑就能看到更激动的分布式计算了。