Tornado异步笔记(三)— 持久连接 KeepAlive 简介

 tornado  Tornado异步笔记(三)— 持久连接 KeepAlive 简介已关闭评论
12月 062018
 

网上看到的3篇连载关于tornado异步的文章,写的很好,虽然有些内容有些老了,但不影响文章的借鉴意义,强烈推荐:

转自:https://www.jianshu.com/p/3cc234198567

HTTP 持久连接

HTTP通信中,client和server一问一答的方式。HTTP是基于TCP的应用层协议,通常在发送请求之前需要创建TCP连接,然后在收到响应之后会断开这个TCP连接。这就是常见的http短连接。既然有短连接,那么也有长连接。

HTTP协议最初的设计是无连接无状态的方式。为了维护状态,引入了cookie和session方式认证识别用户。早期的web开发中,为了给用户推送数据,通常使用所谓的长连接。那时的长连接还是基于短连接的方式实现,即通过client的轮询查询,在用户层面看起来连接并没有断开。随着技术的发展,又出现了Websockt和MQTT等通信协议。Websockt和MQTT则是全双工的通信协议。

相比全双工实现的长连接,我们还会在web开发中遇到伪长连接。即HTTP协议中的keepalive模式。因为HTTP设计是无连接设计,请求应答结束之后就关闭了TCP连接。在http通信中,就会有大量的新建和销毁tcp连接的过程,那怕是同一个用户同一个客户端。为了优化这种方式,HTTP提出了KeepAlive模式,即创建的tcp连接后,传输数据,server返回响应之后并不会关掉tcp连接,下一次http请求就能复用这个tcp连接。

这是一种协商式的连接,毕竟每次的http发送数据的时候,还是要单独为每个请求发送header之类的信息。相比全双工的websocket,一旦创建了连接,下一次就不需要再发送header,直接发送数据即可。因此描述http的keepalive应该是持久连接(HTTP persistent connection )更准确。

keepalive 简介

HTTP的keepalive模式提供了HTTP通信的时候复用TCP连接的协商功能。http1.0默认是关闭的,只有在http的header加入Connection: Keep-Alive才能开启。而http1.1则正相反,默认就打开了,只有显示的在header里加入Connection: close才能关闭。现在的浏览器基本都是http1.1的协议,能否使用长连接,权看服务器的支持状况了。下图说明了开启keepalive模式的持久连接与短连接的通信示意图

短连接与持久连接,图片来源网络

当开启了持久连接,就不能使用返回EOF的方式来判断数据结尾了。对于静态和动态的数据,可以使用Conent-Lenght和Transfer-Encoding`来做应用层的区分。

requests与持久连接

了解了keeplive模式,接下来我们就来使用keepalive方式。服务器使用Tornado,tornado实现了keepalive的处理,客户端我们可以分别使用同步的requests和异步的AsyncHTTPClient。

先写一个简单的服务器:

micro-server.py

import tornado.httpserver 
import tornado.ioloop 
import tornado.web 
class IndexHandler(tornado.web.RequestHandler): 
def get(self, *args, **kwargs): 
        self.finish('It works')

app = tornado.web.Application(
handlers=[
('/', IndexHandler),
],
debug=True ) if __name__ == '__main__':
server = tornado.httpserver.HTTPServer(app)
server.listen(8000)
tornado.ioloop.IOLoop().instance().start()

requests 短连接

requests不愧是一个”for human” 的软件,实现一个http客户端非常简单。

import argparse 
import requests

url = 'http://127.0.0.1:8000' 
def short_connection(): 
    resp = requests.get(url)
    print(resp.text)

    resp = requests.get(url)
    print(resp.text) 
def long_connection(): 
    pass 
if __name__ == '__main__':
    ap = argparse.ArgumentParser()
    ap.add_argument("-t", "--type", default="short")
    args = ap.parse_args()
    type_ = args.type if type_ == 'short':
        short_connection() elif type_ == 'long':
        long_connection()

运行keepalive python requests-cli.py –type=short,可以看见返回了数据,同时通过另外一个神器wireshark抓包如下:

requests 短连接

从抓包的情况来看,两次http请求,一共创建了两次tcp的握手连接和挥手断开。每次发送http数据都需要先创建tcp连接,然后就断开了连接。通常是客户端发起的断开连接。

requests 持久连接

requests的官网也说明了,基于urllib3的方式,requests百分比实现了keepalive方式,只需要创建一个客户端session即可,代码如下:

def long_connection(): s = requests.Session()

    resp = s.get(url)
    print(resp.text)

    resp = s.get(url)
    print(resp.text)

    s.close()

再次通过抓包如下图:

requests 持久连接模式

可以看到,同样也是两次http请求,只创建了一次tcp的握手和挥手。两次http请求都基于一个tcp连接。再次查看包43,可以看到下图中的报文header指定了keepalive。

http请求的数据包

AsyncHTTPClient与持久连接

tornado是一个优秀高性能异步非阻塞(non-block)web框架。如果torando的handler中也需要请求别的三方资源,使用requests的同步网络IO,将会block住整个tornado的进程。因此tornado也实现了异步的http客户端AsyncHTTPClient。

短连接

使用AsyncHTTPClient也不难,但是想要使用其异步效果,就必须把其加入事件循环中,否则只有连接的创立,而没有数据的传输就退出了。

import tornado.httpclient
import tornado.ioloop 
import time

url = 'http://127.0.0.1:8000' 
def handle_response(response):
 if response.error:
        print("Error: %s" % response.error) else:
        print(response.body)

http_client = tornado.httpclient.AsyncHTTPClient()
http_client.fetch(url, handle_response)
http_client.fetch(url, handle_response)

运行上述代码,将会看到wirshark中,创建了两次TCP连接和断开了连接,并没有发送http数据。为了发送http数据,还需要加入tornado的事件循环。即在最后一行加入tornado.ioloop.IOLoop.instance().start()

再次运行,客户端正常收到了数据,抓包如下:

async http client 短连接

抓包的结果咋一看像是持久连接,仔细一看却有两次握手和挥手的操作。的确,客户端发送异步http请求的时候,创建了两个端口49989和49990两个tcp连接。因为是异步的请求,因此先创建了两个连接,然后才发送数据,发送数据的时候都是基于所创建的端口进行的。也就是没有使用持久连接。

持久连接

AsyncHTTPClient使用持久连接也很简单。现在流行微服务架构。通常提供给客户端的服务称之为网关,网关从各种微服务中调用获取数据,通信的方式中,同步的有http和rpc,异步的有mq之类的。而http通常都是使用持久连接的方式。

下面我们介绍一下在tornado server的handler中使用async client请求微服务的资源。

再写一个简单server

#!/usr/bin/env python 
# -*- coding:utf-8 -*- 
import tornado.gen 
import tornado.httpclient 
import tornado.httpserver 
import tornado.ioloop 
import tornado.web 
class AsyncKeepAliveHandler(tornado.web.RequestHandler):
  @tornado.web.asynchronous
  @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        url = 'http://127.0.0.1:8000/'
        http_client = tornado.httpclient.AsyncHTTPClient()
    response = yield tornado.gen.Task(http_client.fetch, url) 
    print response.code print response.body
        self.finish("It works")

app = tornado.web.Application(
        handlers=[
            ('/async/keepalive', AsyncKeepAliveHandler)
        ],
        debug=True ) 
if __name__ == '__main__':
    server = tornado.httpserver.HTTPServer(app)
    server.listen(5050)
    tornado.httpclient.AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient")
    tornado.ioloop.IOLoop().instance().start()

然后我们请求5050端口的服务,也连接发送两次http请求:

(venv)☁  keepalive  curl http://127.0.0.1:5050/async/keepalive It works%                                                                                                                                                     (venv)☁  keepalive  curl http://127.0.0.1:5050/async/keepalive It works%

再看我们的抓包情况:

tornado handler使用持久连接

从图中可以看到,即使是两个请求,最终都是复用了断开为50784的tcp连接。

因为asynchttpclient默认使用的是SimpleAsyncHTTPClient,实现持久连接只需要配置一下tornado.httpclient.AsyncHTTPClient.configure(“tornado.curl_httpclient.CurlAsyncHTTPClient”)即可。当然,这个需要tornado的版本4.2以上,当前的版本是4.5。

CurlAsyncHTTPClient依赖于pycurl。pycurl又依赖libcurl。在安装pycurl的时候,可能会出现link的问题。例如ImportError: pycurl: libcurl link-time version (7.37.1) is older than compile-time version (7.43.0) 。 解决了link问题,如果是mac系统,安装的时候可能出现error: Setup script exited with error: command ‘cc’ failed,多半是由于xcode做鬼,这里有一个解决说明

AsyncHTTPClient设置成为keepalive模式是全局性的,比较tornado是单进程单线程的,访问三方或者微服务,都是一个客户端,所有的模式都是持久连接。

短连接与持久连接的应用场景

持久连接可以减少tcp连接的创建和销毁,提升服务器的处理性能。但是并不是所有连接都得使用持久连接。长短连接都有其使用场景。

既然持久连接在于连接的持久,因此对于频繁通信,点对点的就可以使用。例如网关和微服务之间。如果创建了持久连接,就必须在意连接的存活状态。客户端一般不会主动关闭,因此服务端需要维护这个连接状态,对于一些长时间没有读写事件发生的连接,可以主动断开,节省资源。

对于一些用完就走的场景,也不需要使用持久连接。而另外一些需要全双工通信,例如推送和实时应用,则需要真正的长连接,比如MQTT实现推送和websocket实现实时应用等。

总结

微服务大行其道,从微观来看,增加了更多的网络IO。而IO又是最耗时的操作。相比之下,程式的计算速度就显得没那么紧要了。优化网络IO才是提升性能的关键。一些频繁通信的场景,使用持久连接或长连接更能优化大量TCP连接的创建和销毁。

就Python的而言,Tornado的诞生就是为了解决网络IO的瓶颈。并且很多tornado及其三方库的问题,都能在github和stackoverflow找到作者的参与和回答。可见作者对项目的负责。由于tornado单线程的特性,因此做任何IO操作,都需要考虑是否block。幸好有AsyncHTTPClinet,既可以提供异步IO,也可以实现持久连接,当然,tornado也支持websocket。

Tornado异步笔记(二)— 异步客户端AsyncHTTPClient

 tornado  Tornado异步笔记(二)— 异步客户端AsyncHTTPClient已关闭评论
12月 062018
 

网上看到的3篇连载关于tornado异步的文章,写的很好,虽然有些内容有些老了,但不影响文章的借鉴意义,强烈推荐:

转自: https://www.jianshu.com/p/ff9cb6dea27a/


Tornado异步笔记(二)— 异步客户端
前面了解Tornado的异步任务的常用做法,姑且归结为异步服务。通常在我们的服务内,还需要异步的请求第三方服务。针对HTTP请求,Python的库Requests是最好用的库,没有之一。官网宣称:HTTP for Human。然而,在tornado中直接使用requests将会是一场恶梦。requests的请求会block整个服务进程。
上帝关上门的时候,往往回打开一扇窗。Tornado提供了一个基于框架本身的异步HTTP客户端(当然也有同步的客户端)— AsyncHTTPClient。
AsyncHTTPClient 基本用法
AsyncHTTPClient是 tornado.httpclinet 提供的一个异步http客户端。使用也比较简单。与服务进程一样,AsyncHTTPClient也可以callback和yield两种使用方式。前者不会返回结果,后者则会返回response。
如果请求第三方服务是同步方式,同样会杀死性能。
class SyncHandler(tornado.web.RequestHandler):
    def get(self, *args, **kwargs):

        url = ‘https://api.github.com/’
        resp = requests.get(url)
        print resp.status_code

        self.finish(‘It works’)

使用ab测试大概如下:
Document Path:          /sync
Document Length:        5 bytes

Concurrency Level:      5
Time taken for tests:   10.255 seconds
Complete requests:      5
Failed requests:        0
Total transferred:      985 bytes
HTML transferred:       25 bytes
Requests per second:    0.49 [#/sec] (mean)
Time per request:       10255.051 [ms] (mean)
Time per request:       2051.010 [ms] (mean, across all concurrent requests)
Transfer rate:          0.09 [Kbytes/sec] received

性能相当慢了,换成AsyncHTTPClient再测:
class AsyncHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self, *args, **kwargs):

        url = ‘https://api.github.com/’
        http_client = tornado.httpclient.AsyncHTTPClient()
        http_client.fetch(url, self.on_response)
        self.finish(‘It works’)

    @tornado.gen.coroutine
    def on_response(self, response):
        print response.code
        

qps 提高了很多
Document Path:          /async
Document Length:        5 bytes

Concurrency Level:      5
Time taken for tests:   0.162 seconds
Complete requests:      5
Failed requests:        0
Total transferred:      985 bytes
HTML transferred:       25 bytes
Requests per second:    30.92 [#/sec] (mean)
Time per request:       161.714 [ms] (mean)
Time per request:       32.343 [ms] (mean, across all concurrent requests)
Transfer rate:          5.95 [Kbytes/sec] received

同样,为了获取response的结果,只需要yield函数。
class AsyncResponseHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self, *args, **kwargs):

        url = ‘https://api.github.com/’
        http_client = tornado.httpclient.AsyncHTTPClient()
        response = yield tornado.gen.Task(http_client.fetch, url)
        print response.code
        print response.body

AsyncHTTPClient 转发
使用Tornado经常需要做一些转发服务,需要借助AsyncHTTPClient。既然是转发,就不可能只有get方法,post,put,delete等方法也会有。此时涉及到一些 headers和body,甚至还有https的waring。
下面请看一个post的例子, yield结果,通常,使用yield的时候,handler是需要 tornado.gen.coroutine。
headers = self.request.headers
body = json.dumps({‘name’: ‘rsj217’})
http_client = tornado.httpclient.AsyncHTTPClient()

resp = yield tornado.gen.Task(
    self.http_client.fetch, 
    url,
    method=”POST”, 
    headers=headers,
    body=body, 
    validate_cert=False)

AsyncHTTPClient 构造请求
如果业务处理并不是在handlers写的,而是在别的地方,当无法直接使用tornado.gen.coroutine的时候,可以构造请求,使用callback的方式。

body = urllib.urlencode(params)
req = tornado.httpclient.HTTPRequest(
  url=url, 
  method=’POST’, 
  body=body, 
  validate_cert=False)  

http_client.fetch(req, self.handler_response)

def handler_response(self, response):
    
    print response.code

用法也比较简单,AsyncHTTPClient中的fetch方法,第一个参数其实是一个HTTPRequest实例对象,因此对于一些和http请求有关的参数,例如method和body,可以使用HTTPRequest先构造一个请求,再扔给fetch方法。通常在转发服务的时候,如果开起了validate_cert,有可能会返回599timeout之类,这是一个warning,官方却认为是合理的。
AsyncHTTPClient 上传图片
AsyncHTTPClient 更高级的用法就是上传图片。例如服务有一个功能就是请求第三方服务的图片OCR服务。需要把用户上传的图片,再转发给第三方服务。
class ApiAccountUploadHandler(helper.BaseHandler):
    @tornado.gen.coroutine
    @helper.token_require
    def post(self, *args, **kwargs):
        upload_type = self.get_argument(‘type’, None)

        files_body = self.request.files[‘file’]
        
        new_file = ‘upload/new_pic.jpg’
        new_file_name = ‘new_pic.jpg’

        # 写入文件
        with open(new_file, ‘w’) as w:
            w.write(file_[‘body’])

        logging.info(‘user {} upload {}’.format(user_id, new_file_name))

        # 异步请求 上传图片
        with open(new_file, ‘rb’) as f:
            files = [(‘image’, new_file_name, f.read())]

        fields = ((‘api_key’, KEY), (‘api_secret’, SECRET))

        content_type, body = encode_multipart_formdata(fields, files)

        headers = {“Content-Type”: content_type, ‘content-length’: str(len(body))}
        request = tornado.httpclient.HTTPRequest(config.OCR_HOST,
                                                 method=”POST”, headers=headers, body=body, validate_cert=False)

        response = yield tornado.httpclient.AsyncHTTPClient().fetch(request)

def encode_multipart_formdata(fields, files):
    “””
    fields is a sequence of (name, value) elements for regular form fields.
    files is a sequence of (name, filename, value) elements for data to be
    uploaded as files.
    Return (content_type, body) ready for httplib.HTTP instance
    “””
    boundary = ‘———-ThIs_Is_tHe_bouNdaRY_$’
    crlf = ‘\r\n’
    l = []
    for (key, value) in fields:
        l.append(‘–‘ + boundary)
        l.append(‘Content-Disposition: form-data; name=”%s”‘ % key)
        l.append(”)
        l.append(value)
    for (key, filename, value) in files:
        filename = filename.encode(“utf8”)
        l.append(‘–‘ + boundary)
        l.append(
                ‘Content-Disposition: form-data; name=”%s”; filename=”%s”‘ % (
                    key, filename
                )
        )
        l.append(‘Content-Type: %s’ % get_content_type(filename))
        l.append(”)
        l.append(value)
    l.append(‘–‘ + boundary + ‘–‘)
    l.append(”)
    body = crlf.join(l)
    content_type = ‘multipart/form-data; boundary=%s’ % boundary
    return content_type, body

def get_content_type(filename):
    import mimetypes

    return mimetypes.guess_type(filename)[0] or ‘application/octet-stream’

对比上述的用法,上传图片仅仅是多了一个图片的编码。将图片的二进制数据按照multipart 方式编码。编码的同时,还需要把传递的相关的字段处理好。相比之下,使用requests 的方式则非常简单:

files = {}
f = open(‘/Users/ghost/Desktop/id.jpg’)
files[‘image’] = f
data = dict(api_key=’KEY’, api_secret=’SECRET’)
resp = requests.post(url, data=data, files=files)
f.close()
print resp.status_Code

总结
通过AsyncHTTPClient的使用方式,可以轻松的实现handler对第三方服务的请求。结合前面关于tornado异步的使用方式。无非还是两个key。是否需要返回结果,来确定使用callback的方式还是yield的方式。当然,如果不同的函数都yield,yield也可以一直传递。这个特性,tornado的中的tornado.auth 里面对oauth的认证。
大致就是这样的用法。

Tornado异步笔记(一)— 异步任务

 tornado  Tornado异步笔记(一)— 异步任务已关闭评论
12月 062018
 

网上看到的3篇连载关于tornado异步的文章,写的很好,虽然有些内容有些老了,但不影响文章的借鉴意义,强烈推荐:

转自:https://www.jianshu.com/p/31fae7dd05ba

高性能服务器Tornado
Python的web框架名目繁多,各有千秋。正如光荣属于希腊,伟大属于罗马。Python的优雅结合WSGI的设计,让web框架接口实现千秋一统。WSGI 把应用(Application)和服务器(Server)结合起来。Django 和 Flask 都可以结合 gunicon 搭建部署应用。
与 django 和 flask 不一样,tornado 既可以是 wsgi 应用,也可以是 wsgi 服务。当然,选择tornado更多的考量源于其单进程单线程异步IO的网络模式。高性能往往吸引人,可是有不少朋友使用之后会提出疑问,tornado号称高性能,实际使用的时候却怎么感受不到呢?
实际上,高性能源于Tornado基于Epoll(unix为kqueue)的异步网络IO。因为tornado的单线程机制,一不小心就容易写出阻塞服务(block)的代码。不但没有性能提高,反而会让性能急剧下降。因此,探索tornado的异步使用方式很有必要。
Tornado 异步使用方式
简而言之,Tornado的异步包括两个方面,异步服务端和异步客户端。无论服务端和客户端,具体的异步模型又可以分为回调(callback)和协程(coroutine)。具体应用场景,也没有很明确的界限。往往一个请求服务里还包含对别的服务的客户端异步请求。
服务端异步方式
服务端异步,可以理解为一个tornado请求之内,需要做一个耗时的任务。直接写在业务逻辑里可能会block整个服务。因此可以把这个任务放到异步处理,实现异步的方式就有两种,一种是yield挂起函数,另外一种就是使用类线程池的方式。请看一个同步例子:

class SyncHandler(tornado.web.RequestHandler):

    def get(self, *args, **kwargs):
        # 耗时的代码
        os.system(“ping -c 2 www.google.com”)
        self.finish(‘It works’)

使用ab测试一下:
ab -c 5 -n 5 http://127.0.0.1:5000/sync

Server Software:        TornadoServer/4.3
Server Hostname:        127.0.0.1
Server Port:            5000

Document Path:          /sync
Document Length:        5 bytes

Concurrency Level:      5
Time taken for tests:   5.076 seconds
Complete requests:      5
Failed requests:        0
Total transferred:      985 bytes
HTML transferred:       25 bytes
Requests per second:    0.99 [#/sec] (mean)
Time per request:       5076.015 [ms] (mean)
Time per request:       1015.203 [ms] (mean, across all concurrent requests)
Transfer rate:          0.19 [Kbytes/sec] received
qps 仅有可怜的 0.99,姑且当成每秒处理一个请求吧。
下面祭出异步大法:
class AsyncHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self, *args, **kwargs):

        tornado.ioloop.IOLoop.instance().add_timeout(1, callback=functools.partial(self.ping, ‘www.google.com’))

        # do something others

        self.finish(‘It works’)

    @tornado.gen.coroutine
    def ping(self, url):
        os.system(“ping -c 2 {}”.format(url))
        return ‘after’
尽管在执行异步任务的时候选择了timeout 1秒,主线程的返回还是很快的。ab压测如下:
Document Path:          /async
Document Length:        5 bytes

Concurrency Level:      5
Time taken for tests:   0.009 seconds
Complete requests:      5
Failed requests:        0
Total transferred:      985 bytes
HTML transferred:       25 bytes
Requests per second:    556.92 [#/sec] (mean)
Time per request:       8.978 [ms] (mean)
Time per request:       1.796 [ms] (mean, across all concurrent requests)
Transfer rate:          107.14 [Kbytes/sec] received

上述的使用方式,通过tornado的IO循环,把可以把耗时的任务放到后台异步计算,请求可以接着做别的计算。可是,经常有一些耗时的任务完成之后,我们需要其计算的结果。此时这种方式就不行了。车道山前必有路,只需要切换一异步方式即可。下面使用协程来改写:
class AsyncTaskHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self, *args, **kwargs):
        # yield 结果
        response = yield tornado.gen.Task(self.ping, ‘ www.google.com’)
        print ‘response’, response
        self.finish(‘hello’)

    @tornado.gen.coroutine
    def ping(self, url):
        os.system(“ping -c 2 {}”.format(url))
        return ‘after’
可以看到异步在处理,而结果值也被返回了。

Server Software:        TornadoServer/4.3
Server Hostname:        127.0.0.1
Server Port:            5000

Document Path:          /async/task
Document Length:        5 bytes

Concurrency Level:      5
Time taken for tests:   0.049 seconds
Complete requests:      5
Failed requests:        0
Total transferred:      985 bytes
HTML transferred:       25 bytes
Requests per second:    101.39 [#/sec] (mean)
Time per request:       49.314 [ms] (mean)
Time per request:       9.863 [ms] (mean, across all concurrent requests)
Transfer rate:          19.51 [Kbytes/sec] received

qps提升还是很明显的。有时候这种协程处理,未必就比同步快。在并发量很小的情况下,IO本身拉开的差距并不大。甚至协程和同步性能差不多。例如你跟博尔特跑100米肯定输给他,可是如果跟他跑2米,鹿死谁手还未定呢。
yield挂起函数协程,尽管没有block主线程,因为需要处理返回值,挂起到响应执行还是有时间等待,相对于单个请求而言。另外一种使用异步和协程的方式就是在主线程之外,使用线程池,线程池依赖于futures。Python2需要额外安装。
下面使用线程池的方式修改为异步处理:
from concurrent.futures import ThreadPoolExecutor

class FutureHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(10)

    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self, *args, **kwargs):

        url = ‘www.google.com’
        tornado.ioloop.IOLoop.instance().add_callback(functools.partial(self.ping, url))
        self.finish(‘It works’)

    @tornado.concurrent.run_on_executor
    def ping(self, url):
        os.system(“ping -c 2 {}”.format(url))

再运行ab测试:
Document Path:          /future
Document Length:        5 bytes

Concurrency Level:      5
Time taken for tests:   0.003 seconds
Complete requests:      5
Failed requests:        0
Total transferred:      995 bytes
HTML transferred:       25 bytes
Requests per second:    1912.78 [#/sec] (mean)
Time per request:       2.614 [ms] (mean)
Time per request:       0.523 [ms] (mean, across all concurrent requests)
Transfer rate:          371.72 [Kbytes/sec] received

qps瞬间达到了1912.78。同时,可以看到服务器的log还在不停的输出ping的结果。
想要返回值也很容易。再切换一下使用方式接口。使用tornado的gen模块下的with_timeout功能(这个功能必须在tornado>3.2的版本)。
class Executor(ThreadPoolExecutor):
    _instance = None

    def __new__(cls, *args, **kwargs):
        if not getattr(cls, ‘_instance’, None):
            cls._instance = ThreadPoolExecutor(max_workers=10)
        return cls._instance

class FutureResponseHandler(tornado.web.RequestHandler):
    executor = Executor()

    @tornado.web.asynchronous
    @tornado.gen.coroutine
    def get(self, *args, **kwargs):

        future = Executor().submit(self.ping, ‘www.google.com’)

        response = yield tornado.gen.with_timeout(datetime.timedelta(10), future,
                                                  quiet_exceptions=tornado.gen.TimeoutError)

        if response:
            print ‘response’, response.result()

    @tornado.concurrent.run_on_executor
    def ping(self, url):
        os.system(“ping -c 1 {}”.format(url))
        return ‘after’

线程池的方式也可以通过使用tornado的yield把函数挂起,实现了协程处理。可以得出耗时任务的result,同时不会block住主线程。
Concurrency Level:      5
Time taken for tests:   0.043 seconds
Complete requests:      5
Failed requests:        0
Total transferred:      960 bytes
HTML transferred:       0 bytes
Requests per second:    116.38 [#/sec] (mean)
Time per request:       42.961 [ms] (mean)
Time per request:       8.592 [ms] (mean, across all concurrent requests)
Transfer rate:          21.82 [Kbytes/sec] received

qps为116,使用yield协程的方式,仅为非reponse的十分之一左右。看起来性能损失了很多,主要原因这个协程返回结果需要等执行完毕任务。
好比打鱼,前一种方式是撒网,然后就完事,不闻不问,时间当然快,后一种方式则撒网之后,还得收网,等待收网也是一段时间。当然,相比同步的方式还是快了千百倍,毕竟撒网还是比一只只钓比较快。
具体使用何种方式,更多的依赖业务,不需要返回值的往往需要处理callback,回调太多容易晕菜,当然如果需要很多回调嵌套,首先优化的应该是业务或产品逻辑。yield的方式很优雅,写法可以异步逻辑同步写,爽是爽了,当然也会损失一定的性能。
异步多样化
Tornado异步服务的处理大抵如此。现在异步处理的框架和库也很多,借助redis或者celery等,也可以把tonrado中一些业务异步化,放到后台执行。
此外,Tornado还有客户端异步功能。该特性主要是在于 AsyncHTTPClient的使用。此时的应用场景往往是tornado服务内,需要针对另外的IO进行请求和处理。顺便提及,上述的例子中,调用ping其实也算是一种服务内的IO处理。接下来,将会探索一下AsyncHTTPClient的使用,尤其是使用AsyncHTTPClient上传文件与转发请求。

真正实现tornado异步非阻塞(gen.coroutine,ThreadPoolExecutor,Celery)

 python, tornado  真正实现tornado异步非阻塞(gen.coroutine,ThreadPoolExecutor,Celery)已关闭评论
11月 292018
 

真正实现tornado异步非阻塞,来自:https://blog.csdn.net/u013038616/article/details/72821600

其中 Tornado 的定义是 Web 框架和异步网络库,其中他具备有异步非阻塞能力,能解决他两个框架请求阻塞的问题,在需要并发能力时候就应该使用 Tornado

但是在实际使用过程中很容易把 Tornado 使用成异步阻塞框架,这样对比其他两大框架没有任何优势而言,本文就如何实现真正的异步非阻塞记录。

以下使用的 Python 版本为 2.7.13 
平台为 Macbook
Pro 2016

使用 gen.coroutine 异步编程


Tornado 中两个装饰器:

  • tornado.web.asynchronous

  • tornado.gen.coroutine

asynchronous 装饰器是让请求变成长连接的方式,必须手动调用 self.finish() 才会响应

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        # bad 
        self.write("Hello, world")

asynchronous 装饰器不会自动调用self.finish() ,如果没有没有指定结束,该长连接会一直保持直到 pending 状态。 

没有调用self.finish()
所以正确是使用方式是使用了 asynchronous 需要手动 finish

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        self.write("Hello, world")
        self.finish()
coroutine 装饰器是指定改请求为协程模式,说明白点就是能使用 yield 配合 Tornado 编写异步程序。

Tronado 为协程实现了一套自己的协议,不能使用 Python 普通的生成器。

在使用协程模式编程之前要知道如何编写 Tornado 中的异步函数,Tornado 提供了多种的异步编写形式:回调、Future、协程等,其中以协程模式最是简单和用的最多。

编写一个基于协程的异步函数同样需要 coroutine 装饰器

@gen.coroutine
def sleep(self):
    yield gen.sleep(10)
    raise gen.Return([1, 2, 3, 4, 5])

这就是一个异步函数,Tornado 的协程异步函数有两个特点:
  • 需要使用 coroutine 装饰器

  • 返回值需要使用 raise
    gen.Return()
     
    当做异常抛出

返回值作为异常抛出是因为在 Python
3.2
之前生成器是不允许有返回值的。

使用过 Python 生成器应该知道,想要启动生成器的话必须手动执行 next() 方法才行,所以这里的 coroutine 装饰器的其中一个作用就是在调用这个异步函数时候自动执行生成器。

使用 coroutine 方式有个很明显是缺点就是严重依赖第三方库的实现,如果库本身不支持 Tornado 的异步操作再怎么使用协程也是白搭依然会是阻塞的,放个例子感受一下。

import time
import logging
import tornado.ioloop
import tornado.web
import tornado.options
from tornado import gen

tornado.options.parse_command_line()

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        self.write(“Hello, world”)
        self.finish()

class NoBlockingHnadler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        yield gen.sleep(10)
        self.write(‘Blocking Request’)

class BlockingHnadler(tornado.web.RequestHandler):
    def get(self):
        time.sleep(10)
        self.write(‘Blocking Request’)

def make_app():
    return tornado.web.Application([
        (r”/”, MainHandler),
        (r”/block”, BlockingHnadler),
        (r”/noblock”, NoBlockingHnadler),
    ], autoreload=True)

if __name__ == “__main__”:
    app = make_app()
    app.listen(8000)
    tornado.ioloop.IOLoop.current().start()


为了显示更明显设置了 10

当我们使用 yield
gen.sleep(10)
 
这个异步的 sleep 时候其他请求是不阻塞的。 
非阻塞效果图
当使用 time.sleep(10) 时候会阻塞其他的请求。 
阻塞效果图
这里的异步非阻塞是针对另一请求来说的,本次的请求该是阻塞的仍然是阻塞的。

gen.coroutine Tornado
3.1
后会自动调用 self.finish() 结束请求,可以不使用 asynchronous 装饰器。

所以这种实现异步非阻塞的方式需要依赖大量的基于 Tornado 协议的异步库,使用上比较局限,好在还是有一些可以用的异步库

基于线程的异步编程


使用 gen.coroutine 装饰器编写异步函数,如果库本身不支持异步,那么响应任然是阻塞的。

Tornado 中有个装饰器能使用 ThreadPoolExecutor 来让阻塞过程编程非阻塞,其原理是在 Tornado 本身这个线程之外另外启动一个线程来执行阻塞的程序,从而让 Tornado 变得阻塞。

futures Python3 是标准库,但是在 Python2 中需要手动安装 
pip
install futures

测试代码

import logging
import tornado.ioloop
import tornado.web
import tornado.options
from tornado import gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor

tornado.options.parse_command_line()

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        self.write(“Hello, world”)
        self.finish()

class NoBlockingHnadler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(4)

    @run_on_executor
    def sleep(self, second):
        time.sleep(second)
        return second

    @gen.coroutine
    def get(self):
        second = yield self.sleep(5)
        self.write(‘noBlocking Request: {}’.format(second))

def make_app():
    return tornado.web.Application([
        (r”/”, MainHandler),
        (r”/noblock”, NoBlockingHnadler),
    ], autoreload=True)

if __name__ == “__main__”:
    app = make_app()
    app.listen(8000)
    tornado.ioloop.IOLoop.current().start()

ThreadPoolExecutor 是对标准库中的 threading 的高度封装,利用线程的方式让阻塞函数异步化,解决了很多库是不支持异步的问题。 

这里写图片描述
但是与之而来的问题是,如果大量使用线程化的异步函数做一些高负载的活动,会导致该 Tornado 进程性能低下响应缓慢,这只是从一个问题到了另一个问题而已。

所以在处理一些小负载的工作,是能起到很好的效果,让 Tornado 异步非阻塞的跑起来。

但是明明知道这个函数中做的是高负载的工作,那么你应该采用另一种方式,使用 Tornado 结合 Celery 来实现异步非阻塞。

基于 Celery 的异步编程

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的任务队列,同时也支持任务调度。 
Celery
并不是唯一选择,你可选择其他的任务队列来实现,但是 Celery Python 所编写,能很快的上手,同时 Celery 提供了优雅的接口,易于与 Python
Web
框架集成等特点。

Tornado 的配合可以使用 tornado-celery ,该包已经把 Celery 封装到 Tornado 中,可以直接使用。

实际测试中,由于 tornado-celery 很久没有更新,导致请求会一直阻塞,不会返回

解决办法是:

celery 降级到 3.1
pip install celery==3.1 
pika 降级到 0.9.14
pip install pika==0.9.14

import time
import logging
import tornado.ioloop
import tornado.web
import tornado.options
from tornado import gen

import tcelery, tasks

tornado.options.parse_command_line()
tcelery.setup_nonblocking_producer()


class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        self.write("Hello, world")
        self.finish()


class CeleryHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        response = yield gen.Task(tasks.sleep.apply_async, args=[5])
        self.write('CeleryBlocking Request: {}'.format(response.result))


def make_app(): 
    return tornado.web.Application([
        (r"/", MainHandler),
        (r"/celery-block", CeleryHandler),
    ], autoreload=True)

if __name__ == "__main__":
    app = make_app()
    app.listen(8000)
    tornado.ioloop.IOLoop.current().start()
import os
import time
from celery import Celery
from tornado import gen

celery = Celery("tasks", broker="amqp://")
celery.conf.CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'amqp')

@celery.task
def sleep(seconds):
    time.sleep(float(seconds))
    return seconds

if __name__ == "__main__":
    celery.start()

这里写图片描述

Celery Worker 运行在另一个进程中,独立于 Tornado 进程,不会影响 Tornado 运行效率,在处理复杂任务时候比进程模式更有效率。

总结


方法

优点

缺点

可用性

gen.coroutine

简单、优雅

需要异步库支持

★★☆☆☆

线程

简单

可能会影响性能

★★★☆☆

Celery

性能好

操作复杂、版本低

★★★☆☆

目前没有找到最佳的异步非阻塞的编程模式,可用的异步库比较局限,只有经常用的,个人编写异步库比较困难。

推荐使用线程和 Celery 的模式进行异步编程,轻量级的放在线程中执行,复杂的放在 Celery 中执行。当然如果有异步库使用那最好不过了。

Python
3
中可以把 Tornado 设置为 asyncio 的模式,这样就使用
兼容
asyncio 模式的库,这应该是日后的方向。

Reference


celery  [‘selərɪ]  详细X

基本翻译
n. [园艺] 芹菜

网络释义
Celery: 芹菜
celery salt: 香芹盐
celery sticks: 芹菜杆

tornado文件上传下载举例

 开发  tornado文件上传下载举例已关闭评论
12月 262016
 

1. 文件上传参考:

#!/usr/bin/python
#-*- encoding:utf-8 -*-
import tornado.ioloop
import tornado.web
import shutil
import os

class UploadFileHandler(tornado.web.RequestHandler):
    def get(self):
        self.write('''
<html>
  <head><title>Upload File</title></head>
  <body>
    <form action='file' enctype="multipart/form-data" method='post'>
    <input type='file' name='file'/><br/>
    <input type='submit' value='submit'/>
    </form>
  </body>
</html>
''')

    def post(self):
        upload_path=os.path.join(os.path.dirname(__file__),'files')  #文件的暂存路径
        file_metas=self.request.files['file']    #提取表单中‘name’为‘file’的文件元数据
        for meta in file_metas:
            filename=meta['filename']
            filepath=os.path.join(upload_path,filename)
            with open(filepath,'wb') as up:      #有些文件需要已二进制的形式存储,实际中可以更改
                up.write(meta['body'])
            self.write('finished!')

app=tornado.web.Application([
    (r'/file',UploadFileHandler),
])

if __name__ == '__main__':
    app.listen(3000)
    tornado.ioloop.IOLoop.instance().start()

2. 文件下载代码参考:
def post(self,filename):
    print('i download file handler : ',filename)
    #Content-Type这里我写的时候是固定的了,也可以根据实际情况传值进来
    self.set_header ('Content-Type', 'application/octet-stream')
    self.set_header ('Content-Disposition', 'attachment; filename='+filename)
    #读取的模式需要根据实际情况进行修改
    with open(filename, 'rb') as f:
        while True:
            data = f.read(buf_size)
            if not data:
                break
            self.write(data)
    #记得有finish哦
    self.finish()

Celery,Tornado,Supervisor构建和谐的分布式系统

 python, supervisord  Celery,Tornado,Supervisor构建和谐的分布式系统已关闭评论
10月 252016
 

好文章,需要分享!

Celery 分布式的任务队列

与rabbitmq消息队列的区别与联系:

  • rabbitmq 调度的是消息,而Celery调度的是任务.
  • Celery调度任务时,需要传递参数信息,传输载体可以选择rabbitmq.
  • 利用rabbitmq的持久化和ack特性,Celery可以保证任务的可靠性.

优点:

  • 轻松构建分布式的Service Provider,提供服务。
  • 高可扩展性,增加worker也就是增加了队列的consumer。
  • 可靠性,利用消息队列的durable和ack,可以尽可能降低消息丢失的概率,当worker崩溃后,未处理的消息会重新进入消费队列。
  • 用户友好,利用flower提供的管理工具可以轻松的管理worker。

    flower

  • 使用tornado-celery,结合tornado异步非阻塞结构,可以提高吞吐量,轻松创建分布式服务框架。
  • 学习成本低,可快速入门

快速入门
定义一个celery实例main.py:

from celery import Celery app = Celery('route_check', include=['check_worker_path'], 
        broker='amqp://user:password@rabbitmq_host:port//') app.config_from_object('celeryconfig')

include指的是需要celery扫描是否有任务定义的模块路径。例如add_task就是扫描add_task.py中的任务

celery的配置文件可以从文件、模块中读取,这里是从模块中读取,celeryconfig.py为:

from multiprocessing import cpu_count from celery import platforms from kombu import Exchange, Queue

CELERYD_POOL_RESTARTS = False CELERY_RESULT_BACKEND = 'redis://:password@redis_host:port/db' CELERY_QUEUES = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('common_check', Exchange('route_check'), routing_key='common_check'),
    Queue('route_check', Exchange('route_check'), routing_key='route_check', delivery_mode=2),
    Queue('route_check_ignore_result', Exchange('route_check'), routing_key='route_check_ignore_result',
          delivery_mode=2)
)
CELERY_ROUTES = { 'route_check_task.check_worker.common_check': {'queue': 'common_check'}, 'route_check_task.check_worker.check': {'queue': 'route_check'}, 'route_check_task.check_worker.check_ignore_result': {'queue': 'route_check_ignore_result'}
}
CELERY_DEFAULT_QUEUE = 'default' CELERY_DEFAULT_EXCHANGE = 'default' CELERY_DEFAULT_EXCHANGE_TYPE = 'direct' CELERY_DEFAULT_ROUTING_KEY = 'default' # CELERY_MESSAGE_COMPRESSION = 'gzip' CELERY_ACKS_LATE = True CELERYD_PREFETCH_MULTIPLIER = 1 CELERY_DISABLE_RATE_LIMITS = True CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_ENABLE_UTC = True CELERYD_CONCURRENCY = cpu_count() / 2 CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TASK_PUBLISH_RETRY = True CELERY_TASK_PUBLISH_RETRY_POLICY = { 'max_retries': 3, 'interval_start': 10, 'interval_step': 5, 'interval_max': 20 }
platforms.C_FORCE_ROOT = True

这里面是一些celery的配置参数

在上面include的add_task.py定义如下:

#encoding:utf8 from main import app @app.task def add(x,y): return x+y

启动celery
celery -A main worker -l info -Ofair

  • -A 后面是包含celery定义的模块,我们在main.py中定义了app = Celery…
    测试celery:
  • -l 日志打印的级别,这里是info
  • Ofair 这个参数可以让Celery更好的调度任务
# encoding:utf8 __author__ = 'brianyang' import add_task

result = add_task.add.apply_async((1,2)) print type(result) print result.ready() print result.get() print result.ready()

输出是

<class 'celery.result.AsyncResult'> False 3 True

当调用result.get()时,如果还没有返回结果,将会阻塞直到结果返回。这里需要注意的是,如果需要返回worker执行的结果,必须在之前的config中配置CELERY_RESULT_BACKEND这个参数,一般推荐使用Redis来保存执行结果,如果不关心worker执行结果,设置CELERY_IGNORE_RESULT=True就可以了,关闭缓存结果可以提高程序的执行速度。
在上面的测试程序中,如果修改为:

# encoding:utf8 __author__ = 'brianyang' import add_task

result = add_task.add.(1,2) print type(result) print result

输出结果为:

<type 'int'> 3

相当于直接本地调用了add方法,并没有走Celery的调度。
通过flower的dashbord可以方便的监控任务的执行情况:

task list

task detail


还可以对worker进行重启,关闭之类的操作

taks_op


使用Celery将一个集中式的系统拆分为分布式的系统大概步骤就是:

  • 根据功能将耗时的模块拆分出来,通过注解的形式让Celery管理
  • 为拆分的模块设置独立的消息队列
  • 调用者导入需要的模块或方法,使用apply_async进行异步的调用并根据需求关注结果。
  • 根据性能需要可以添加机器或增加worker数量,方便弹性管理。

需要注意的是:

  • 尽量为不同的task分配不同的queue,避免多个功能的请求堆积在同一个queue中。
  • celery -A main worker -l info -Ofair -Q add_queue启动Celery时,可以通过参数Q加queue_name来指定该worker只接受指定queue中的tasks.这样可以使不同的worker各司其职。
  • CELERY_ACKS_LATE可以让你的Celery更加可靠,只有当worker执行完任务后,才会告诉MQ,消息被消费。
  • CELERY_DISABLE_RATE_LIMITSCelery可以对任务消费的速率进行限制,如果你没有这个需求,就关闭掉它吧,有益于会加速你的程序。

tornado-celery

tornado应该是python中最有名的异步非阻塞模型的web框架,它使用的是单进程轮询的方式处理用户请求,通过epoll来关注文件状态的改变,只扫描文件状态符发生变化的FD(文件描述符)。
由于tornado是单进程轮询模型,那么就不适合在接口请求后进行长时间的耗时操作,而是应该接收到请求后,将请求交给背后的worker去干,干完活儿后在通过修改FD告诉tornado我干完了,结果拿走吧。很明显,Celery与tornado很般配,而tornado-celery是celery官方推荐的结合两者的一个模块。
整合两者很容易,首先需要安装:

  • tornado-celery
  • tornado-redis
    tornado代码如下:
# encoding:utf8 __author__ = 'brianyang' import tcelery import tornado.gen import tornado.web from main import app import add_task

tcelery.setup_nonblocking_producer(celery_app=app) class CheckHandler(tornado.web.RequestHandler):  @tornado.web.asynchronous  @tornado.gen.coroutine def get(self): x = int(self.get_argument('x', '0'))
        y = int(self.get_argument('y', '0'))
        response = yield tornado.gen.Task(add_task.add.apply_async, args=[x, y])
        self.write({'results': response.result})
        self.finish


application = tornado.web.Application([
    (r"/add", CheckHandler),
]) if __name__ == "__main__":
    application.listen(8889)
    tornado.ioloop.IOLoop.instance().start()

在浏览器输入:http://127.0.0.1:8889/add?x=1&y=2
结果为:

通过tornado+Celery可以显著的提高系统的吞吐量。

Benchmark

使用Jmeter进行压测,60个进程不间断地的访问服务器:
接口单独访问响应时间一般在200~400ms

  • uwsgi + Flask方案:
    uwsgi关键配置:

    processes       = 10 threads         = 3

    Flask负责接受并处理请求,压测结果:
    qps是46,吞吐量大概是2700/min

    uwsgi+Flask

  • tornado+Celery方案:
    Celery配置:
    CELERYD_CONCURRENCY = 10也就是10个worker(进程),压测结果:
    qps是139,吞吐量大概是8300/min

    tornado+Celery

    从吞吐量和接口相应时间各方面来看,使用tornado+Celery都能带来更好的性能。

Supervisor

  • 什么是supervisor
    supervisor俗称Linux后台进程管理器
  • 适合场景
    — 需要长期运行程序,除了nohup,我们有更好的supervisor
    — 程序意外挂掉,需要重启,让supervisor来帮忙
    — 远程管理程序,不想登陆服务器,来来来,supervisor提供了高大上的web操作界面.
    之前启动Celery命令是celery -A main worker -l info -Ofair -Q common_check,当你有10台机器的时候,每次更新代码后,都需要登陆服务器,然后更新代码,最后再杀掉Celery进程重启,恶不恶心,简直恶心死了。
    让supervisor来,首先需要安装:
    pip install supervisor
    配置文件示例:
[unix_http_server]
file=/tmp/supervisor.sock   ; path to your socket file
chmod=0777 username=admin
password=admin

[inet_http_server]
port=0.0.0.0:2345 username=admin
password=admin

[supervisord]
logfile=/var/log/supervisord.log ; supervisord log file
logfile_maxbytes=50MB       ; maximum size of logfile before rotation
logfile_backups=10 ; number of backed up logfiles
loglevel=info               ; info, debug, warn, trace
pidfile=/var/run/supervisord.pid ; pidfile location
nodaemon=false ; run supervisord as a daemon
minfds=1024 ; number of startup file descriptors
minprocs=200 ; number of process descriptors
user=root                   ; default user
childlogdir=/var/log/            ; where child log files will live

[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface

[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; use unix:// schem for a unix sockets. username=admin
password=admin
[program:celery]
command=celery -A main worker -l info -Ofair

directory=/home/q/celeryTest
user=root
numprocs=1 stdout_logfile=/var/log/worker.log stderr_logfile=/var/log/worker.log autostart=true autorestart=true startsecs=10 ; Need to wait for currently executing tasks to finish at shutdown.
; Increase this if you have very long running tasks.
stopwaitsecs = 10 ; When resorting to send SIGKILL to the program to terminate it
; send SIGKILL to its whole process group instead,
; taking care of its children as well.
killasgroup=true ; Set Celery priority higher than default (999) ; so, if rabbitmq is supervised, it will start first.
priority=1000

示例文件很长,不要怕,只需要复制下来,改改就可以
比较关键的几个地方是:

[inet_http_server]
port=0.0.0.0:2345 username=admin
password=admin

这个可以让你通过访问http://yourhost:2345,验证输入admin/admin的方式远程管理supervisor,效果如下:

remote supervisor


[program:flower]这里就是你要托管给supervisor的程序的一些配置,其中autorestart=true可以在程序崩溃时自动重启进程,不信你用kill试试看。
剩下的部分就是一些日志位置的设置,当前工作目录设置等,so esay~

supervisor优点:

  • 管理进程简单,再也不用nohup & kill了。
  • 再也不用担心程序挂掉了
  • web管理很方便

缺点:

  • web管理虽然方便,但是每个页面只能管理本机的supervisor,如果我有一百台机器,那就需要打开100个管理页面,太麻烦了.

怎么办~

supervisor-easy闪亮登场

通过rpc调用获取配置中的每一个supervisor程序的状态并进行管理,可以分组,分机器进行批量/单个的管理。方便的不要不要的。来两张截图:

  • 分组管理:

    group

  • 分机器管理:

    server

    通过简单的配置,可以方便的进行管理。

转自:http://www.jianshu.com/p/8f43ee526ae3

tornado平台中使用微信证书退款(https证书退款)( [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed解决)

 python  tornado平台中使用微信证书退款(https证书退款)( [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed解决)已关闭评论
9月 272016
 

使用tornado的httpclient做微信支付开发时,如果需要调用退款等需要证书验证的api时,可以使用下面的函数做证书验证:
(微信提供下载的证书文件有: apiclient_cert.p12, apiclient_cert.pem,apiclient_key.pem,rootca.pem)

from tornado import httpclient 
def get_response_httpclient_withcert(url,body,method, header = None, client_key=None, client_cert=None, ca_certs=None):
    http_client = httpclient.HTTPClient()
    request = httpclient.HTTPRequest(url, headers = header, body=body, method=method, client_key=client_key, client_cert=client_cert, connect_timeout=20.0, request_timeout=20.0)

    #如果出现类似“xxxxxx/tornado/iostream.py[line:1276] WARNING :SSL Error on 14 (‘x.x.x.x.’, 443): [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:590)”这样的错误,可以将ca_certs的参数也带上(即带上rootca.pem),使用下面的语句: 
    #request = httpclient.HTTPRequest(url, headers = header, body=body, method=method, client_key=client_key,     client_cert=client_cert, ca_certs=ca_certs, connect_timeout=20.0, request_timeout=20.0) 
    response = http_client.fetch(request)
    return response.body

调用举例:
module_path = os.path.dirname(__file__) 
client_cert_path = os.path.abspath(os.path.join(module_path, “../keys/apiclient_cert.pem”))
client_key_path = os.path.abspath(os.path.join(module_path, “../keys/apiclient_key.pem”))
 ca_certs_path = os.path.abspath(os.path.join(module_path, “../keys/rootca_wx.pem”))
resp = webRequrestUtil.get_response_httpclient_withcert(‘https://api.mch.weixin.qq.com/secapi/pay/refund’, “<xml> xxxxxxxxx</xml>”, “POST”, client_key=client_key_path, client_cert=client_cert_path, ca_certs=ca_certs_path)

 

使用tornado的coroutine进行编程

 python  使用tornado的coroutine进行编程已关闭评论
8月 102016
 

tornado 的gen.coroutine编程,自己还要好好消化下。

在tornado3发布之后,强化了coroutine的概念,在异步编程中,替代了原来的gen.engine, 变成现在的gen.coroutine。这个装饰器本来就是为了简化在tornado中的异步编程。避免写回调函数, 使得开发起来更加符合正常逻辑思维。一个简单的例子如下:

class MaindHandler(web.RequestHandler):
    @asynchronous
    @gen.coroutine
    def post(self):
        client = AsyncHTTPClient()
        resp = yield client.fetch(https://api.github.com/users")
        if resp.code == 200:
            resp = escape.json_decode(resp.body)
            self.write(json.dumps(resp, indent=4, separators=(',', ':')))
        else:
            resp = {"message": "error when fetch something"}
            self.write(json.dumps(resp, indent=4, separators={',', ':')))
        self.finish()

在yield语句之后,ioloop将会注册该事件,等到resp返回之后继续执行。这个过程是异步的。在这里使用json.dumps,而没有使用tornado自带的escape.json_encode,是因为在构建REST风格的API的时候,往往会从浏览器里访问获取JSON格式的数据。使用json.dumps格式化数据之后,在浏览器端显示查看的时候会更加友好。Github API就是这一风格的使用者。其实escape.json_encode就是对json.dumps的简单包装,我在提pull request要求包装更多功能的时候,作者的回答escape并不打算提供全部的json功能,使用者可以自己直接使用json模块。

Gen.coroutine原理

在之前一篇博客中讲到要使用tornado的异步特性,必须使用异步的库。否则单个进程阻塞,根本不会达到异步的效果。 Tornado的异步库中最常用的就是自带的AsyncHTTPClient,以及在其基础上实现的OpenID登录验证接口。另外更多的异步库可以在这里找到。包括用的比较多的MongoDB的Driver。

在3.0版本之后,gen.coroutine模块显得比较突出。coroutine装饰器可以让本来靠回调的异步编程看起来像同步编程。其中便是利用了Python中生成器的Send函数。在生成器中,yield关键字往往会与正常函数中的return相比。它可以被当成迭代器,从而使用next()返回yield的结果。但是生成器还有另外一个用法,就是使用send方法。在生成器内部可以将yield的结果赋值给一个变量,而这个值是通过外部的生成器client来send的。举一个例子:

def test_yield():
    pirnt "test yeild"
    says = (yield)
    print says

if __name__ == "__main__":
    client = test_yield()
    client.next()
    client.send("hello world")

输出结果如下:

test yeild
hello world

已经在运行的函数会挂起,直到调用它的client使用send方法,原来函数继续运行。而这里的gen.coroutine方法就是异步执行需要的操作,然后等待结果返回之后,再send到原函数,原函数则会继续执行,这样就以同步方式写的代码达到了异步执行的效果。

Tornado异步编程

使用coroutine实现函数分离的异步编程。具体如下:

@gen.coroutine
def post(self):
    client = AsyncHTTPClient()
    resp = yield client.fetch("https://api.github.com/users")
    if resp == 200:
        body = escape.json_decode(resy.body)
    else:
        body = {"message": "client fetch error"}
        logger.error("client fetch error %d, %s" % (resp.code, resp.message))
    self.write(escape.json_encode(body))
    self.finish()

换成函数之后可以变成这样;

@gen.coroutime
def post(self):
    resp = yield GetUser()
    self.write(resp)

@gen.coroutine
def GetUser():
    client = AsyncHTTPClient()
    resp = yield client.fetch("https://api.github.com/users")
    if resp.code == 200:
        resp = escape.json_decode(resp.body)
    else:
        resp = {"message": "fetch client error"}
        logger.error("client fetch error %d, %s" % (resp.code, resp.message))
    raise gen.Return(resp)

这里,当把异步封装在一个函数中的时候,并不是像普通程序那样使用return关键字进行返回,gen模块提供了一个gen.Return的方法。是通过raise方法实现的。这个也是和它是使用生成器方式实现有关的。

使用coroutine跑定时任务

Tornado中有这么一个方法:

tornado.ioloop.IOLoop.instance().add_timeout()

该方法是time.sleep的非阻塞版本,它接受一个时间长度和一个函数这两个参数。表示多少时间之后调用该函数。在这里它是基于ioloop的,因此是非阻塞的。该方法在客户端长连接以及回调函数编程中使用的比较多。但是用它来跑一些定时任务却是无奈之举。通常跑定时任务也没必要使用到它。但是我在使用heroku的时候,发现没有注册信用卡的话仅仅能够使用一个简单Web Application的托管。不能添加定时任务来跑。于是就想出这么一个方法。在这里,我主要使用它隔一段时间通过Github API接口去抓取数据。大自使用方法如下:

  • 装饰器

    def sync_loop_call(delta=60 * 1000):
      """
      Wait for func down then process add_timeout
      """
          def wrap_loop(func):
              @wraps(func)
              @gen.coroutine
              def wrap_func(*args, **kwargs):
                  options.logger.info("function %r start at %d" %
                                      (func.__name__, int(time.time())))
                  try:
                      yield func(*args, **kwargs)
                  except Exception, e:
                      options.logger.error("function %r error: %s" %
                                           (func.__name__, e))
                  options.logger.info("function %r end at %d" %
                                      (func.__name__, int(time.time())))
                  tornado.ioloop.IOLoop.instance().add_timeout(
                      datetime.timedelta(milliseconds=delta),
                      wrap_func)
              return wrap_func
          return wrap_loop
  • 任务函数

    @sync_loop_call(delta=10 * 1000)
      def worker():
          """
          Do something
          """
  • 添加任务

    if __name__ == "__main__":
      worker()
      app.listen(options.port)
      tornado.ioloop.IOLoop.instance().start()

这样做之后,当Web Application启动之后,定时任务就会随着跑起来,而且因为它是基于事件的,并且异步执行的,所以并不会影响Web服务的正常运行,当然任务不能是阻塞的或计算密集型的。我这里主要是抓取数据,而且用的是Tornado自带的异步抓取方法。

在sync_loop_call装饰器中,我在wrap_func函数上加了@gen.coroutine装饰器,这样就保证只有yeild的函数执行完之后,才会执行add_timeout操作。如果没有@gen.coroutine装饰器。那么不等到yeild返回,就会执行add_timeout了。

完整地例子可以参见我的Github,这个项目搭建在heroku上。用于展示Github用户活跃度排名和用户区域分布情况。可以访问Github-Data查看。由于国内heroku被墙,需要翻墙才能访问。

总结

Tornado是一个非阻塞的web服务器以及web框架,但是在使用的时候只有使用异步的库才会真正发挥它异步的优势,当然有些时候因为App本身要求并不是很高,如果不是阻塞特别严重的话,也不会有问题。另外使用coroutine模块进行异步编程的时候,当把一个功能封装到一个函数中时,在函数运行中,即使出现错误,如果没有去捕捉的话也不会抛出,这在调试上显得非常困难。

来自:http://simple-is-better.com/news/1017