python的分布式任務huey如何實現(xiàn)異步化任務講解
來源:易賢網(wǎng) 閱讀:2790 次 日期:2015-03-17 10:31:40
溫馨提示:易賢網(wǎng)小編為您整理了“python的分布式任務huey如何實現(xiàn)異步化任務講解”,方便廣大網(wǎng)友查閱!

本文我們來分享一個python的輕型的任務隊列程序,他可以讓python的分布式任務huey實現(xiàn)異步化任務,感興趣的朋友可以看看。

一個輕型的任務隊列,功能和相關的broker沒有celery強大,重在輕型,而且代碼讀起來也比較的簡單。

關于huey的介紹: (比celery輕型,比mrq、rq要好用 ?。?/P>

a lightweight alternative.

written in python

no deps outside stdlib, except redis (or roll your own backend)

support for django

supports:

multi-threaded task execution

scheduled execution at a given time

periodic execution, like a crontab

retrying tasks that fail

task result storage

安裝:

代碼如下:

Installing

huey can be installed very easily using pip.

pip install huey

huey has no dependencies outside the standard library, but currently the only fully-implemented queue backend it ships with requires redis. To use the redis backend, you will need to install the python client.

pip install redis

Using git

If you want to run the very latest, feel free to pull down the repo from github and install by hand.

git clone

cd huey

python setup.py install

You can run the tests using the test-runner:

python setup.py test

關于huey的api,下面有詳細的介紹及參數(shù)介紹的。

代碼如下:

from huey import RedisHuey, crontab

huey = RedisHuey('my-app', host='redis.myapp.com')

@huey.task()

def add_numbers(a, b):

return a + b

@huey.periodic_task(crontab(minute='0', hour='3'))

def nightly_backup():

sync_all_data()

juey作為woker的時候,一些cli參數(shù)。

常用的是:

-l 關于日志文件的執(zhí)行 。

-w workers的數(shù)目,-w的數(shù)值大了,肯定是增加任務的處理能力

-p --periodic 啟動huey worker的時候,他會從tasks.py里面找到 需要crontab的任務,會派出幾個線程專門處理這些事情。

-n 不啟動關于crontab里面的預周期執(zhí)行,只有你觸發(fā)的時候,才會執(zhí)行周期星期的任務。

--threads 意思你懂的。

1

代碼如下:

# 原文:

The following table lists the options available for the consumer as well as their default values.

-l, --logfile

Path to file used for logging. When a file is specified, by default Huey will use a rotating file handler (1MB / chunk) with a maximum of 3 backups. You can attach your own handler (huey.logger) as well. The default loglevel is INFO.

-v, --verbose

Verbose logging (equates to DEBUG level). If no logfile is specified and verbose is set, then the consumer will log to the console. This is very useful for testing/debugging.

-q, --quiet

Only log errors. The default loglevel for the consumer is INFO.

-w, --workers

Number of worker threads, the default is 1 thread but for applications that have many I/O bound tasks, increasing this number may lead to greater throughput.

-p, --periodic

Indicate that this consumer process should start a thread dedicated to enqueueing “periodic” tasks (crontab-like functionality). This defaults to True, so should not need to be specified in practice.

-n, --no-periodic

Indicate that this consumer process should not enqueue periodic tasks.

-d, --delay

When using a “polling”-type queue backend, the amount of time to wait between polling the backend. Default is 0.1 seconds.

-m, --max-delay

The maximum amount of time to wait between polling, if using weighted backoff. Default is 10 seconds.

-b, --backoff

The amount to back-off when polling for results. Must be greater than one. Default is 1.15.

-u, --utc

Indicates that the consumer should use UTC time for all tasks, crontabs and scheduling. Default is True, so in practice you should not need to specify this option.

--localtime

Indicates that the consumer should use localtime for all tasks, crontabs and scheduling. Default is False.

Examples

Running the consumer with 8 threads, a logfile for errors only, and a very short polling interval:

huey_consumer.py my.app.huey -l /var/log/app.huey.log -w 8 -b 1.1 -m 1.0

任務隊列huey 是靠著redis來實現(xiàn)queue的任務存儲,所以需要咱們提前先把redis-server和redis-py都裝好。 安裝的方法就不說了,自己搜搜吧。

我們首先創(chuàng)建下huey的鏈接實例 :

代碼如下:

# config.py

from huey import Huey

from huey.backends.redis_backend import RedisBlockingQueue

queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)

huey = Huey(queue)

然后就是關于任務的,也就是你想讓誰到任務隊列這個圈子里面,和celey、rq,mrq一樣,都是用tasks.py表示的。

代碼如下:

from config import huey # import the huey we instantiated in config.py

@huey.task()

def count_beans(num):

print '-- counted %s beans --' % num

名單1

再來一個真正去執(zhí)行的 。 main.py 相當于生產(chǎn)者,tasks.py相當于消費者的關系。 main.py負責喂數(shù)據(jù)。

代碼如下:

main.py

from config import huey # import our "huey" object

from tasks import count_beans # import our task

if __name__ == '__main__':

beans = raw_input('How many beans? ')

count_beans(int(beans))

print 'Enqueued job to count %s beans' % beans

Ensure you have Redis running locally

Ensure you have installed huey

Start the consumer: huey_consumer.py main.huey (notice this is “main.huey” and not “config.huey”).

Run the main program: python main.py

和celery、rq一樣,他的結果獲取是需要在你的config.py或者主代碼里面指明他的存儲的方式,現(xiàn)在huey還僅僅是支持redis,但相對他的特點和體積,這已經(jīng)很足夠了 !

只是那幾句話而已,導入RedisDataStore庫,申明下存儲的地址。

代碼如下:

from huey import Huey

from huey.backends.redis_backend import RedisBlockingQueue

from huey.backends.redis_backend import RedisDataStore # ADD THIS LINE

queue = RedisBlockingQueue('test-queue', host='localhost', port=6379)

result_store = RedisDataStore('results', host='localhost', port=6379) # ADDED

huey = Huey(queue, result_store=result_store) # ADDED result store

這個時候,我們在ipython再次去嘗試的時候,會發(fā)現(xiàn)可以獲取到tasks.py里面的return值了 其實你在main.py里面獲取的時候,他還是通過uuid從redis里面取出來的。

代碼如下:

>>> from main import count_beans

>>> res = count_beans(100)

>>> res # what is "res" ?

<huey.api.AsyncData object at 0xb7471a4c>

>>> res.get() # get the result of this task

'Counted 100 beans'

huey也是支持celey的延遲執(zhí)行和crontab的功能 。 這些功能很是重要,可以自定義的優(yōu)先級或者不用再借助linux本身的crontab。

用法很簡單,多加一個delay的時間就行了,看了下huey的源碼,他默認是立馬執(zhí)行的。當然還是要看你的線程是否都是待執(zhí)行的狀態(tài)了。

代碼如下:

>>> import datetime

>>> res = count_beans.schedule(args=(100,), delay=60)

>>> res

<huey.api.AsyncData object at 0xb72915ec>

>>> res.get() # this returns None, no data is ready

>>> res.get() # still no data...

>>> res.get(blocking=True) # ok, let's just block until its ready

'Counted 100 beans'

名單

更多信息請查看IT技術專欄

更多信息請查看網(wǎng)絡編程

2026上岸·考公考編培訓報班

  • 報班類型
  • 姓名
  • 手機號
  • 驗證碼
關于我們 | 聯(lián)系我們 | 人才招聘 | 網(wǎng)站聲明 | 網(wǎng)站幫助 | 非正式的簡要咨詢 | 簡要咨詢須知 | 新媒體/短視頻平臺 | 手機站點 | 投訴建議
工業(yè)和信息化部備案號:滇ICP備2023014141號-1 云南省教育廳備案號:云教ICP備0901021 滇公網(wǎng)安備53010202001879號 人力資源服務許可證:(云)人服證字(2023)第0102001523號
聯(lián)系電話:0871-65099533/13759567129 獲取招聘考試信息及咨詢關注公眾號:hfpxwx
咨詢QQ:1093837350(9:00—18:00)版權所有:易賢網(wǎng)