理解线程池运行过程(ThreadPoolExecutor举例,打印线程池当前线程数、队列任务数、已完成数等信息)

 java  理解线程池运行过程(ThreadPoolExecutor举例,打印线程池当前线程数、队列任务数、已完成数等信息)已关闭评论
11月 292021
 

jdk提供了非常方便的方式来生成线程池, 如Executors.newxxxxxx的方式, 实现实际使用的都是以下ThreadPoolExecutor的方法:

/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

线程池的管理是这样一个过程:

首先创建线程池, 然后根据任务的数量逐步将线程增大到corePoolSize数量, 如果此时任有任务增加, 则放置到workQueue中, 直到workQueue爆满为止, 然后继续增加线程池中的线程数量,增加处理能力,最终达到maximumPoolSize。 如果此时还是有任务增加进来会怎样呢 ? 这就需要handler来处理了,或者丢弃新任务, 或者拒绝新任务,或者挤占已有任务等。在任务队列何线程池都饱和情况下,一旦有线程处于等待(任务处理完毕, 没有新任务增加)状态的时间超过keepAliveTime,则该线程终止, 也就是说池中的线程数量会足部降低, 直至为corePoolSize数量为止。 至于threadFactory,可以自己新增一个,设置线程的自定义名称, daemon状态,便于后续排查错误。

我们用一个测试程序来帮助理解线程池的运行过程:

public class ThreadPoolTest {

private static ExecutorService es = new ThreadPoolExecutor(50, 100, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>(100000));

public static void main(String[] args) throws Exception {
for (int i = 0; i < 100000; i++) {
es.execute(() -> {
System.out.print(1);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});

}

ThreadPoolExecutor tpe = ((ThreadPoolExecutor) es);
while (true) {
System.out.println();
int queueSize = tpe.getQueue().size();
System.out.println(“当前排队任务数:” + queueSize);

int activeCount = tpe.getActiveCount();
System.out.println(“当前活动线程数:” + activeCount);

long completedTaskCount = tpe.getCompletedTaskCount();
System.out.println(“执行完成线程数:” + completedTaskCount);

long taskCount = tpe.getTaskCount();
System.out.println(“总线程数:” + taskCount);

Thread.sleep(3000);
}
}

}

 

程序每3秒打印一次线程池情况:

11111111111111111111111111111111111111111111111111
当前排队线程数:99950
当前活动线程数:50
执行完成线程数:0
总线程数:100000
1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111
当前排队线程数:99849
当前活动线程数:50
执行完成线程数:127
总线程数:100000
111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111
当前排队线程数:99700
当前活动线程数:50
执行完成线程数:250
总线程数:100000
111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111
当前排队线程数:99550
当前活动线程数:50
执行完成线程数:400
总线程数:100000
111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111
当前排队线程数:99400
当前活动线程数:50
执行完成线程数:550
总线程数:100000
111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111
当前排队线程数:99250
当前活动线程数:50
执行完成线程数:700
总线程数:100000

。。。。。。

可以看到当前活动的线程数永远都是50, 为什么就没有突破?怎么没到100呢? 大家可以思考下,相信看了上面的描述,很容易就能理解了,DONE!

 

真正实现tornado异步非阻塞(gen.coroutine,ThreadPoolExecutor,Celery)

 python, tornado  真正实现tornado异步非阻塞(gen.coroutine,ThreadPoolExecutor,Celery)已关闭评论
11月 292018
 

真正实现tornado异步非阻塞,来自:https://blog.csdn.net/u013038616/article/details/72821600

其中 Tornado 的定义是 Web 框架和异步网络库,其中他具备有异步非阻塞能力,能解决他两个框架请求阻塞的问题,在需要并发能力时候就应该使用 Tornado

但是在实际使用过程中很容易把 Tornado 使用成异步阻塞框架,这样对比其他两大框架没有任何优势而言,本文就如何实现真正的异步非阻塞记录。

以下使用的 Python 版本为 2.7.13 
平台为 Macbook
Pro 2016

使用 gen.coroutine 异步编程


Tornado 中两个装饰器:

  • tornado.web.asynchronous

  • tornado.gen.coroutine

asynchronous 装饰器是让请求变成长连接的方式,必须手动调用 self.finish() 才会响应

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        # bad 
        self.write("Hello, world")

asynchronous 装饰器不会自动调用self.finish() ,如果没有没有指定结束,该长连接会一直保持直到 pending 状态。 

没有调用self.finish()
所以正确是使用方式是使用了 asynchronous 需要手动 finish

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        self.write("Hello, world")
        self.finish()
coroutine 装饰器是指定改请求为协程模式,说明白点就是能使用 yield 配合 Tornado 编写异步程序。

Tronado 为协程实现了一套自己的协议,不能使用 Python 普通的生成器。

在使用协程模式编程之前要知道如何编写 Tornado 中的异步函数,Tornado 提供了多种的异步编写形式:回调、Future、协程等,其中以协程模式最是简单和用的最多。

编写一个基于协程的异步函数同样需要 coroutine 装饰器

@gen.coroutine
def sleep(self):
    yield gen.sleep(10)
    raise gen.Return([1, 2, 3, 4, 5])

这就是一个异步函数,Tornado 的协程异步函数有两个特点:
  • 需要使用 coroutine 装饰器

  • 返回值需要使用 raise
    gen.Return()
     
    当做异常抛出

返回值作为异常抛出是因为在 Python
3.2
之前生成器是不允许有返回值的。

使用过 Python 生成器应该知道,想要启动生成器的话必须手动执行 next() 方法才行,所以这里的 coroutine 装饰器的其中一个作用就是在调用这个异步函数时候自动执行生成器。

使用 coroutine 方式有个很明显是缺点就是严重依赖第三方库的实现,如果库本身不支持 Tornado 的异步操作再怎么使用协程也是白搭依然会是阻塞的,放个例子感受一下。

import time
import logging
import tornado.ioloop
import tornado.web
import tornado.options
from tornado import gen

tornado.options.parse_command_line()

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        self.write(“Hello, world”)
        self.finish()

class NoBlockingHnadler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        yield gen.sleep(10)
        self.write(‘Blocking Request’)

class BlockingHnadler(tornado.web.RequestHandler):
    def get(self):
        time.sleep(10)
        self.write(‘Blocking Request’)

def make_app():
    return tornado.web.Application([
        (r”/”, MainHandler),
        (r”/block”, BlockingHnadler),
        (r”/noblock”, NoBlockingHnadler),
    ], autoreload=True)

if __name__ == “__main__”:
    app = make_app()
    app.listen(8000)
    tornado.ioloop.IOLoop.current().start()


为了显示更明显设置了 10

当我们使用 yield
gen.sleep(10)
 
这个异步的 sleep 时候其他请求是不阻塞的。 
非阻塞效果图
当使用 time.sleep(10) 时候会阻塞其他的请求。 
阻塞效果图
这里的异步非阻塞是针对另一请求来说的,本次的请求该是阻塞的仍然是阻塞的。

gen.coroutine Tornado
3.1
后会自动调用 self.finish() 结束请求,可以不使用 asynchronous 装饰器。

所以这种实现异步非阻塞的方式需要依赖大量的基于 Tornado 协议的异步库,使用上比较局限,好在还是有一些可以用的异步库

基于线程的异步编程


使用 gen.coroutine 装饰器编写异步函数,如果库本身不支持异步,那么响应任然是阻塞的。

Tornado 中有个装饰器能使用 ThreadPoolExecutor 来让阻塞过程编程非阻塞,其原理是在 Tornado 本身这个线程之外另外启动一个线程来执行阻塞的程序,从而让 Tornado 变得阻塞。

futures Python3 是标准库,但是在 Python2 中需要手动安装 
pip
install futures

测试代码

import logging
import tornado.ioloop
import tornado.web
import tornado.options
from tornado import gen
from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor

tornado.options.parse_command_line()

class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        self.write(“Hello, world”)
        self.finish()

class NoBlockingHnadler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(4)

    @run_on_executor
    def sleep(self, second):
        time.sleep(second)
        return second

    @gen.coroutine
    def get(self):
        second = yield self.sleep(5)
        self.write(‘noBlocking Request: {}’.format(second))

def make_app():
    return tornado.web.Application([
        (r”/”, MainHandler),
        (r”/noblock”, NoBlockingHnadler),
    ], autoreload=True)

if __name__ == “__main__”:
    app = make_app()
    app.listen(8000)
    tornado.ioloop.IOLoop.current().start()

ThreadPoolExecutor 是对标准库中的 threading 的高度封装,利用线程的方式让阻塞函数异步化,解决了很多库是不支持异步的问题。 

这里写图片描述
但是与之而来的问题是,如果大量使用线程化的异步函数做一些高负载的活动,会导致该 Tornado 进程性能低下响应缓慢,这只是从一个问题到了另一个问题而已。

所以在处理一些小负载的工作,是能起到很好的效果,让 Tornado 异步非阻塞的跑起来。

但是明明知道这个函数中做的是高负载的工作,那么你应该采用另一种方式,使用 Tornado 结合 Celery 来实现异步非阻塞。

基于 Celery 的异步编程

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的任务队列,同时也支持任务调度。 
Celery
并不是唯一选择,你可选择其他的任务队列来实现,但是 Celery Python 所编写,能很快的上手,同时 Celery 提供了优雅的接口,易于与 Python
Web
框架集成等特点。

Tornado 的配合可以使用 tornado-celery ,该包已经把 Celery 封装到 Tornado 中,可以直接使用。

实际测试中,由于 tornado-celery 很久没有更新,导致请求会一直阻塞,不会返回

解决办法是:

celery 降级到 3.1
pip install celery==3.1 
pika 降级到 0.9.14
pip install pika==0.9.14

import time
import logging
import tornado.ioloop
import tornado.web
import tornado.options
from tornado import gen

import tcelery, tasks

tornado.options.parse_command_line()
tcelery.setup_nonblocking_producer()


class MainHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        self.write("Hello, world")
        self.finish()


class CeleryHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        response = yield gen.Task(tasks.sleep.apply_async, args=[5])
        self.write('CeleryBlocking Request: {}'.format(response.result))


def make_app(): 
    return tornado.web.Application([
        (r"/", MainHandler),
        (r"/celery-block", CeleryHandler),
    ], autoreload=True)

if __name__ == "__main__":
    app = make_app()
    app.listen(8000)
    tornado.ioloop.IOLoop.current().start()
import os
import time
from celery import Celery
from tornado import gen

celery = Celery("tasks", broker="amqp://")
celery.conf.CELERY_RESULT_BACKEND = os.environ.get('CELERY_RESULT_BACKEND', 'amqp')

@celery.task
def sleep(seconds):
    time.sleep(float(seconds))
    return seconds

if __name__ == "__main__":
    celery.start()

这里写图片描述

Celery Worker 运行在另一个进程中,独立于 Tornado 进程,不会影响 Tornado 运行效率,在处理复杂任务时候比进程模式更有效率。

总结


方法

优点

缺点

可用性

gen.coroutine

简单、优雅

需要异步库支持

★★☆☆☆

线程

简单

可能会影响性能

★★★☆☆

Celery

性能好

操作复杂、版本低

★★★☆☆

目前没有找到最佳的异步非阻塞的编程模式,可用的异步库比较局限,只有经常用的,个人编写异步库比较困难。

推荐使用线程和 Celery 的模式进行异步编程,轻量级的放在线程中执行,复杂的放在 Celery 中执行。当然如果有异步库使用那最好不过了。

Python
3
中可以把 Tornado 设置为 asyncio 的模式,这样就使用
兼容
asyncio 模式的库,这应该是日后的方向。

Reference


celery  [‘selərɪ]  详细X

基本翻译
n. [园艺] 芹菜

网络释义
Celery: 芹菜
celery salt: 香芹盐
celery sticks: 芹菜杆

Python’s concurrent.futures 使用(ThreadPoolExecutor,ProcessPoolExecutor)

 python  Python’s concurrent.futures 使用(ThreadPoolExecutor,ProcessPoolExecutor)已关闭评论
8月 052016
 

一篇关于使用Python’s concurrent.futures 的文章,作者为解决日志内容分析目的,使用了concurrent的多个方法,大家可以在作者解决问题的过程中学习到不少关于concurrent包的使用方法。

In this essay I’ll describe how to use the concurrent.futures API from Python 3.2. Since I’m still using Python 2.7, I’ll use Alex Grönholm’s back port instead.

PEP 3148 gives the motivation for the new concurrent module:

Python currently has powerful primitives to construct multi-threaded and multi-process applications but parallelizing simple operations requires a lot of work i.e. explicitly launching processes/threads, constructing a work/results queue, and waiting for completion or some other termination condition (e.g. failure, timeout). It is also difficult to design an application with a global process/thread limit when each component invents its own parallel execution strategy.

Basically, using “threading” and “multiprocessing” are harder than they should be.

The guiding problem: analyze web logs

My web site archives the daily server logs. Filenames are of the form “www.20120115.gz”. Each access is a single line in “combined log format.” Here’s an example line:

198.180.131.21 - - [25/Dec/2011:00:47:19 -0500] "GET /writings/diary/diary-rss.xml HTTP/1.1" 304 174 "-" "Mozilla/5.0 (Windows NT 5.1; rv:8.0) Gecko/20100101 Firefox/8.0"

It contains the host IP address, date, URL path, referrer information, user agent, and a few more fields.

I have 169 files which I want to analyze. gzcat *.gz | wc -l says there are 1,346,595 records. I’ll use this data set to show some examples of how to use concurrent.futures.

Number of accesses per day (single-threaded)

For the start, how many log events are there per day?

import glob
import gzip

for filename in glob.glob("www_logs/www.*.gz"):
    with gzip.open(filename) as f:
        num_lines = sum(1 for line in f)
    print filename.split(".")[1], num_lines

Note: gzip files didn’t support context managers until Python 2.7. If you are on Python 2.6 then you’ll get the error message

AttributeError: GzipFile instance has no attribute '__exit__'

When I run that I get output which looks like:

20110801 7305
20110802 7594
20110803 7470
20110804 7348
20110805 7504
20110806 4774
20110807 4870
20110808 9815
...
20120113 18124
20120114 9245
20120115 8100
20120116 14117

That’s too detailed, and hard to interpret. A graph would be nicer. Here it is:

I seem to get more people during the work week than the weekend, and one of my other essays got on Hacker News in early January.

I made that plot using the matplotlib’s “pylab” API:

import glob
import gzip

from pylab import *
import datetime

dates = []
counts = []

for filename in glob.glob("www_logs/www.*.gz"):
    with gzip.open(filename) as f:
        num_lines = sum(1 for line in f)
    date = datetime.datetime.strptime(filename, "www_logs/www.%Y%m%d.gz")

    dates.append(date)
    counts.append(num_lines)

plot(dates, counts)
ylim(0, max(counts))
title("My website accesses")
show()

That code is a bit ugly, so I’ll clean it up a bit and conveniently put it into a form which helps transition to the parallelization code:

import glob
import gzip
import datetime
import time

def count_lines(filename):
    with gzip.open(filename) as f:
        num_lines = sum(1 for line in f)
    date = datetime.datetime.strptime(filename, "www_logs/www.%Y%m%d.gz")
    return (date, num_lines)

filenames = glob.glob("www_logs/www.*.gz")

dates = []
counts = []
for filename in filenames:
    date, count = count_lines(filename)
    dates.append(date)
    counts.append(count)

## Believe or not, but this next line does the same as the previous block(!)
# dates, counts = zip(*(count_lines(filename) for filename in filenames))

from pylab import *
plot(dates, counts)
ylim(0, max(counts))
title("My website accesses")
show()

It’s slow. Make it faster!

That code takes 5.5 seconds to read the 1.3 million lines. I have a four core machine – surely I can make better use of my hardware!

I’ll start with multiple threads. Python has supported threads since the 1990s, but as we all know, CPython has the Global Interpreter Lock which prevents multiple threads from running Python code at the same time. On the other hand, this task is doing file I/O, and gzip uncompression in code which might release the GIL. Perhaps threads will work here?

I’ll use a very standard approach. I’ll define a set of jobs, and pass that over to a thread pool. Each job takes a filename to process as input, calls the function “count_lines”, and returns the timestamp and number of lines in the file.

Here’s how you do that with the concurrent.futures API:

import glob
import gzip
import datetime from concurrent import futures def count_lines(filename):
    with gzip.open(filename) as f:
        num_lines = sum(1 for line in f)
    date = datetime.datetime.strptime(filename, "www_logs/www.%Y%m%d.gz")
    return (date, num_lines)

filenames = glob.glob("www_logs/www.*.gz")

dates = []
counts = [] with futures.ThreadPoolExecutor(max_workers=2) as executor:
    for (date, count) in executor.map(count_lines, filenames): dates.append(date)
        counts.append(count)

from pylab import *
plot(dates, counts)
ylim(0, max(counts))
title("My website accesses")
show()

The “ThreadPoolExecutor” creates a thread pool, in this case with two workers. You can submit as many jobs as you want to this thread pool, but only two (in this case) will be processed at a time. The thread pool is also a context manager, and no more jobs can be submitted once the context is finished.

How are jobs submitted? You can either submit a job using submit() or you can submit a number of jobs using the “map() ” idiom, which is what I did here. Remember, this is a Python 3.x API so map() returns an iterator, and not a list like it does in Python 2.x.

What is “map”?

The term “map” comes from functional programming, but functional programming is not emphasized in the Python language. Instead, we more often use a list or generator comprehension, or build a list manually. The following three methods are equivalent:

>>> print [ord(c) for c in "Andrew"]
[65, 110, 100, 114, 101, 119]
>>> print map(ord, "Andrew")
[65, 110, 100, 114, 101, 119]
>>> result = []
>>> for c in "Andrew":
...   result.append(ord(c))
... 
>>> print result
[65, 110, 100, 114, 101, 119]

So “map(count_lines, filenames)” is a roughly the same as:

  for filename in filenames:
    yield count_lines(filename)

and “executor.map” does the same thing, only it uses a thread in the thread pool to evaluate the function.

Also, to switch the above code to its almost exact single-threaded version, what you can do is get the Python 2.x iterater version of “map” (in itertools.imap) and rewrite the above as:

import itertools
 ...
for (date, count) in itertools.imap(count_lines, filenames):
    dates.append(date)
    counts.append(count)

But is it faster?

No. 😉

With one thread in the thread pool, the task takes 5.5 seconds. The overall time is unchanged from the unthreaded version, as we should expect.

With two worker threads, it takes 7.0 seconds – even longer than with one thread!

Three worker threads takes 7.3 seconds, and four threads takes 7.4. This is not a trend you want to see when you need to parallelize your software.

There are two likely candidates for the slowdown. The GIL is the obvious one, but perhaps my computer doesn’t handle parallel disk I/O that well.

What about multiple processes?

What I’ll do is switch from the multi-threaded version to the multi-processing version. Instead of using a thread pool, I’ll have a process pool, which uses interprocess communications to send the job request to each process and get the results:

import glob
import gzip
import datetime
from concurrent import futures

def count_lines(filename):
    with gzip.open(filename) as f:
        num_lines = sum(1 for line in f)
    date = datetime.datetime.strptime(filename, "www_logs/www.%Y%m%d.gz")
    return (date, num_lines)

filenames = glob.glob("www_logs/www.*.gz")

dates = []
counts = []
with futures.ProcessPoolExecutor(max_workers=4) as executor:
    for (date, count) in executor.map(count_lines, filenames):
        dates.append(date)
        counts.append(count)
    
from pylab import *
plot(dates, counts)
ylim(0, max(counts))
title("My website accesses")
show()

Did you see the difference? I used a “ProcessPoolExecutor” instead of a “ThreadPoolExecutor”.

With that small change, a process pool with only one worker finishes in 5.6 seconds, which is a bit slower. That’s probably due to the overhead of starting a new process and sending data back and forth.

What’s exciting is that two workers finishes in 3.6 seconds, three workers in 2.8 seconds, and four workers in 2.6 seconds. It’s obviously not great speedup (perfect scaling would be 5.5, 2.3, 1.8, and 1.1 seconds), but I end up cutting my time in half with relatively little work.

Faster, please

At this point it’s safe to assume that most of the gzip+line count code requires the GIL. A quick look at “gzip.py” tells me that, yes, that is the case.

With some non-trivial effort, I could write a specialized C extension to replace the gzip module. That’s overkill for this project. Instead, my computer has the usual unix utilities so I’ll rewrite the “count_lines” function and let them them do the work instead.

import subprocess

def count_lines(filename):
    gzcat = subprocess.Popen(["gzcat", filename],
                             stdout = subprocess.PIPE)
    wc = subprocess.Popen(["wc", "-l"],
                          stdin = gzcat.stdout,
                          stdout = subprocess.PIPE)
    num_lines = int(wc.stdout.readline())
    date = datetime.datetime.strptime(filename, "www_logs/www.%Y%m%d.gz")
    return (date, num_lines)

Using this version, my single-threaded time is 3.2 seconds, with two threads it’s 2.0 seconds, three threads is 1.8 seconds, and four threads is 1.7 seconds.

The respective times with the process pool are 3.3 seconds, 2.1 seconds, 1.8 seconds and 1.8 seconds. This means that very little time in either of these cases is spent in the GIL, and the slightly slower multiprocess times likely reflects extra cost of starting a process and doing interprocess communications (IPC).

What are the top URLs on my site?

Okay, I admit that the previous section was overkill, but it’s fun sometimes to try out and compare different alternatives.

I want to mine my logs for more information. What are the top 10 most downloaded URLs?

This is the perfect situation for Python’s Counter container. This was added in Python 2.7; see that link for how to support older versions of Python.

I’ll start with the simplest single-threaded version; remember that a line in the log file looks like:

198.180.131.21 - - [25/Dec/2011:00:47:19 -0500] "GET /writings/diary/diary-rss.xml HTTP/1.1" 304 174 "-" "Mozilla/5.0 (Windows NT 5.1; rv:8.0) Gecko/20100101 Firefox/8.0"

The following analysis code:

import glob
import gzip
from collections import Counter

counter = Counter()
for filename in glob.glob("www_logs/www.*.gz"):
    with gzip.open(filename) as f:
        for line in f:
            # Extract the path field from the log string
            request = line.split('"')[1]
            path = request.split(" ")[1]
            counter[path] += 1

for path, count in counter.most_common(10):
    print count, path

takes 8.9 seconds to generate this listing:

170073 /favicon.ico
93354 /writings/diary/diary-rss.xml
81513 /dss.css
78961 /images/toplogo_left.gif
78655 /images/spacer.gif
78526 /images/toplogo_right.gif
74223 /images/news_title.gif
26528 /
25349 /robots.txt
16962 /writings/NBN/python_intro/standard.css

That’s really not exciting information. In a bit, I’ll have it only display counts for the information.

A concurrent.futures version

We’ve determined that Python’s gzip reader uses the GIL, so it’s pointless to parallelize the above code using threads.

There’s another issue. The “counter” is a global data structure, and that can’t be shared across multiple Python processes. I’ll have to update the algorithm somewhat. I’ll let each worker function process a file and create a new counter for that file. Once it’s done, I’ll send the counter instance back to the main process for further processing.

Here’s a worker function which does that.

def count_urls(filename):
    counter = Counter()
    with gzip.open(filename) as f:
        for line in f:
            request = line.split('"')[1]
            path = request.split(" ")[1]
            counter[path] += 1
    return counter

The code in the main process has to kick off all of the jobs, collect the counters from each file, merge the counters into one, and report the top hits. The new(ish) Counter object helps make this easy because the “update()” method sums the values for shared keys instead of replacing like it would for a dictionary.

merged_counter = Counter()
filenames = glob.glob("www_logs/www.*.gz")

with futures.ProcessPoolExecutor(max_workers=4) as executor:
    for counter in executor.map(count_urls, filenames):
        merged_counter.update(counter)

for path, count in merged_counter.most_common(10):
    print count, path

(You might be asking “How does it exchange Python objects?” Answer: Through pickles.)

The above runs in 4.4 seconds, so about 1/2 the time as the single processor version. And after I fixed a bug (I used “counter” my report, not “merged_counter”), I got identical values as the single-threaded version.

4.4 seconds is pretty good. As we saw before, Python’s gzip reader is not as fast as calling out to gzcat, so I decided to use a Popen call instead. Also, I changed the code slightly so it only reports paths which end with “.html”.

The final code runs in 3.2 seconds, and here it is:

from collections import Counter
from concurrent import futures
import glob
import gzip
import itertools
import subprocess

def count_urls(filename):
    counter = Counter()
    p = subprocess.Popen(["gzcat", filename],
                         stdout = subprocess.PIPE)
    for line in p.stdout:
        request = line.split('"')[1]
        path = request.split(" ")[1]
        if path.endswith(".html"):
            counter[path] += 1
    return counter

filenames = glob.glob("www_logs/www.*.gz")

merged_counter = Counter()
with futures.ProcessPoolExecutor(max_workers=4) as executor:
    for counter in executor.map(count_urls, filenames):
        merged_counter.update(counter)

for path, count in merged_counter.most_common(10):
    print count, path

It tells me that the 10 most popular HTML pages from my site are

15830 /Python/PyRSS2Gen.html
13722 /writings/NBN/python_intro/command_line.html
11739 /writings/NBN/threads.html
10663 /writings/NBN/validation.html
6635 /writings/diary/archive/2007/06/01/lolpython.html
4525 /writings/NBN/writing_html.html
3756 /writings/NBN/generators.html
3465 /writings/NBN/parsing_with_ply.html
2958 /writings/diary/archive/2005/04/21/screen_scraping.html
2786 /writings/NBN/blast_parsing.html

Resolving host names from IP addresses

My logs contain bare IP address. I’m curious about where they come from. I write about cheminformatics; are any computers from pharma companies reading my pages? To do that, I need a fully qualified domain name for each IP address. Moreover, I want to save the IP address to domain name mapping so I can use it in other analyses.

Here’s how to get the FQDN given an IP address as a string.

>>> import socket
>>> socket.getfqdn("82.94.164.162")
'dinsdale.python.org'

DNS lookups take a surprisingly long time; 0.2 seconds on my desktop, and I understand this is typical. Since I have 117,504 addresses, that may take a few hours. On the other hand, all of that time is spent waiting for the network to respond. This is easily parallelized.

socket.getfqdn() is single-threaded on a Mac

I tried at first to use multiple threads for this, but that didn’t work. No matter how many threads I used, the overall time was the same. After a wild goose chase where I suspected that my ISP throttled the number of DNS lookups, I found the problem.

The getfqdn function is a thin wrapper to socket.gethostbyaddr(), which itself is a thin layer on top of the C function “gethostbyaddr()”. In most cases, the underlying API may only be called from a single thread. A common solution is to implement a reentrant version, usually named “gethostbyaddr_r”, but the OS X developers decided that people should use a different API for that case. (“getaddrinfo … is a replacement for and provides more flexibility than the gethostbyname(3) and getservbyname(3) functions”.) The Python module only calls the single-threaded code, and uses a lock to ensure that only one thread calls it at a time.

The problem is easily solved by using a process pool instead of a thread pool.

Extracting IP addresses from a set of gzip compressed log files

The first step is to get the IP addresses which I want to convert. I only care about unique IP addresses, and don’t want to waste time looking up duplicates. The code to extract the IP addresses is straight-forward. Reading the compressed file is not the slow part, so there’s no reason to parallelize this or use an external gzip process to speed things up.

def get_unique_ip_addresses(filenames):
    # Report only the unique IP addresses in the set
    seen = set()
    
    for filename in filenames:
        with gzip.open(filename) as gzfile:
            for line in gzfile:
                # The IP address is the first word in the line
                ip_addr = line.split()[0]
                if ip_addr not in seen:
                    seen.add(ip_addr)
                    yield ip_addr

I don’t want to process the entire data set during testing and debugging. The above returns an iterator, so I use itertools.islice to get a section of 100 terms, also as an iterator:

filenames = glob.glob("www_logs/www.*.gz")
ip_addresses = itertools.islice(get_unique_ip_addresses(filenames), 1800, 1900)

(I started with the range (0, 1000), but then ran into the gethostbyaddr reentrancy problems. I didn’t want my computer to do a simple local cache lookup, so I change the range to (1000, 1100), then (1100, 1200) and so on. This show that it took me a while to figure out what was wrong!)

Using “executor.submit()” instead of “executor.map”

How am I going to do the parallel call? I could do a simple

with ProcessPoolExecutor(max_workers=20) as executor:
  for fqdn in executor.map(socket.getfqdn, ip_addresses):
    print fqdn

but then I lose track of the original IP address, and I wanted to cache the IP address to FQDN mapping for later use. While it might be possible to use a combination of itertools.tee and itertools.izip, I decided that “map” wasn’t the right call in the first place.

The executor’s “map” function guarantees that the result order will be the same as the input order. I don’t care about the order. Instead, I’ll submit each job using the “submit()” method.

with futures.ProcessPoolExecutor(max_workers=10) as executor:
    jobs = []
    for ip_addr in ip_addresses:
        job = executor.submit(resolve_fqdn, ip_addr)
        jobs.append(job)

The submit function returns a “concurrent.futures.Future” object. For now, there are two important things about it. You can ask it for its “result()”, like this:

ip_addr, fqdn = job.result()

The “result()” method blocks until the Promise has a result. Blocking is bad for performance, so how do you know which job promises are actually ready? Use “concurrent.futures.as_completed()” for that:

for job in futures.as_completed(jobs):
    ip_addr, fqdn = job.result()
    print ip_addr, fqdn

The last part to this puzzle is to have the actual job return the two element tuple with both the input IP address and the resulting FQDN

def resolve_fqdn(ip_addr):
    fqdn = socket.getfqdn(ip_addr)
    return ip_addr, fqdn

Put it all together and the code is:

from concurrent import futures
import glob
import gzip
import itertools
import socket
import time

def get_unique_ip_addresses(filenames):
    # Report only the unique IP addresses in the set
    seen = set()
    
    for filename in filenames:
        with gzip.open(filename) as gzfile:
            for line in gzfile:
                # The IP address is the first word in the line
                ip_addr = line.split()[0]
                if ip_addr not in seen:
                    seen.add(ip_addr)
                    yield ip_addr

def resolve_fqdn(ip_addr):
    fqdn = socket.getfqdn(ip_addr)
    return ip_addr, fqdn


filenames = glob.glob("www_logs/www.*.gz")
ip_addresses = itertools.islice(get_unique_ip_addresses(filenames), 1800, 1900)

with futures.ProcessPoolExecutor(max_workers=20) as executor:
    jobs = []
    for ip_addr in ip_addresses:
        job = executor.submit(resolve_fqdn, ip_addr)
        jobs.append(job)

    # Get the completed jobs whenever they are done
    for job in futures.as_completed(jobs):
        ip_addr, fqdn = job.result()
        print ip_addr, fqdn

This processes 100 IP addresses in about 2-8 seconds. The actual time is highly dependent on DNS response times from servers around the world. To reduce the variability, I increased the number of IP addresses I used for my measurements. I found that with 20 processes I could do about 50 lookups per second, and with 50 processes I could do about 90 lookups per second. I didn’t try a higher number.

Use a dictionary of futures instead of a list

What I did seems somewhat clumsy in that I send the IP address to the process, and the process sends the IP address back to me. I did that because it was easy. The module documentation shows another technique.

You can keep the jobs in a dictionary, where the key is the future object (returned by “submit()”), and its value is the information you want to save. That is, you can rewrite the above as:

with futures.ProcessPoolExecutor(max_workers=20) as executor:
    jobs = {}
    for ip_addr in ip_addresses:
        job = executor.submit(socket.getfqdn, ip_addr)
        jobs[job] = ip_addr

    # Get the completed jobs whenever they are done
    for job in futures.as_completed(jobs):
        ip_addr = jobs[job]
        fqdn = job.result()
        print ip_addr, fqdn

Notice how it doesn’t need the “resolve_fqdn” function; it can call socket.getfqdn directly.

Add a callback to the job future

The conceptual model so far is “create all the jobs” followed by “do something with the results.” This works well, except for latency. I only processed 100 IP addresses in my example. I removed the “islice()” call and asked it to process all 117,504 IP addresses in my data set. The code looked like it wasn’t working because it wasn’t giving output. As it turned out, it was still loading all of the jobs.

The concurrent module uses an asynchronous model, and just like Twisted’s Deferred and jQuery’s deferred.promise(), there’s a way to attach a callback function to a future, which will be called once the answer is ready. Here’s how it works:

with futures.ProcessPoolExecutor(max_workers=50) as executor:
    for ip_addr in ip_addresses:
        job = executor.submit(resolve_fqdn, ip_addr)
        job.add_done_callback(print_mapping)

When each job future is ready, the concurrent library will call the “print_mapping” callback, with the job result as its sole parameter:

def print_mapping(job):
    ip_addr, fqdn = job.result()
    print ip_addr, fqdn

Technical notes: The callback occurs in the same process which submitted the job, which is exactly what’s needed here. However, the documentation doesn’t say that all of the callbacks will be done from the same thread, so if you are using a thread pool then you probably want to use a thread lock around a shared resource. (sys.stdout is a shared resource, so you would need one around the print statement here. I’m using a process pool, and the concurrent process pool implementation uses a single local worker thread, so I don’t think I have to worry about contention. You should verify that.)

Here is the final callback-based code:

from concurrent import futures
import glob
import gzip
import socket

def get_unique_ip_addresses(filenames):
    # Report only the unique IP addresses in the set
    seen = set()
    
    for filename in filenames:
        with gzip.open(filename) as gzfile:
            for line in gzfile:
                # The IP address is the first word in the line
                ip_addr = line.split()[0]
                if ip_addr not in seen:
                    seen.add(ip_addr)
                    yield ip_addr

def resolve_fqdn(ip_addr):
    fqdn = socket.getfqdn(ip_addr)
    return ip_addr, fqdn

## A multi-threaded version should use create a resource lock
# import threading
# write_lock = threading.Lock()

def print_mapping(job):
    ip_addr, fqdn = job.result()
    print ip_addr, fqdn

    ## A multi-threaded version should use the resource lock
    # with write_lock:
    #   print ip_addr, fqdn

filenames = glob.glob("www_logs/www.*.gz")
with futures.ProcessPoolExecutor(max_workers=50) as executor:
    for ip_addr in get_unique_ip_addresses(filenames):
        job = executor.submit(resolve_fqdn, ip_addr)
        job.add_done_callback(print_mapping)

It processed my 117,504 addresses in 1236 seconds (about 21 minutes), which means a rate of 95 per second. That’s much better than my original rate of 5 per second!

functools.partial

By the way, just like earlier, there’s no absolute need for the worker function to return the ip address. I could have written this as:

job = executor.submit(socket.getfqdn, ip_addr)
job.add_done_callback(functools.partial(print_mapping, ip_addr))

or even as an ugly-looking lambda function with a default value to get around scoping issues.

In this variation, print_mapping becomes:

def print_mapping(ip_addr, job):
    fqdn = job.result()
    print ip_addr, fqdn

where the “ip_addr” was stored by the “partial()”, and where “job” comes from the completed promise.

This approach feels more “pure”, but I find that methods like this are harder for most people to understand.

Who subscribes to my blog’s RSS feed?

A quick check of the list of hostnames shows that no one from AstraZeneca reads my blog from a work machine. Actually, I don’t have any accesses from them at all, which is a bit surprising since I know some of them follow what I do. They might use a blog aggregator like Google Reader, or use a home account, or perhaps AZ’s data goes through a proxy which doesn’t have the name “az” or “astrazeneca” in it.

There are requests from Roche, and Vertex, but no blog subscribers. Who then subscribes to my blog?

Here I print the hostnames for requests which fetch my blog’s RSS feed. With ‘zgrep’ it’s fast enough that I’m not going to parallelize the code.

import subprocess
import glob

hostname_table = dict(line.split() for line in open("hostnames"))

filenames = glob.glob("www_logs/www.*.gz")
p = subprocess.Popen(["zgrep", "--no-filename", "/writings/diary/diary-rss.xml"] + filenames,
                     stdout = subprocess.PIPE)

for line in p.stdout:
    ip_addr = line.split()[0]
    hostname = hostname_table[ip_addr]
    if hostname_table == ip_addr:
        # Couldn't find a reverse lookup; ignore
        continue
    print hostname

A quick look at the output shows a lot of requests from Amazon and Google, so I removed those, and report the results using:

% python2.7 readers.py | sort | uniq -c | sort -n | grep -v amazon | grep -v google.com

Since I have 169 days of log file, I’ll say that “avid readers” poll the URL at least once per day. That gives me:

 176 modemcable139.154-178-173.mc.videotron.ca
 184 62.197.198.100
 187 v041222.dynamic.ppp.asahi-net.or.jp
 195 modemcable147.252-178-173.mc.videotron.ca
 200 94-226-195-151.access.telenet.be
 202 217.28.199.236
 223 123.124.21.91
 223 65.52.56.128
 241 5a-m02-d1.data-hotel.net
 263 117.218.210-67.q9.net
 263 173-11-122-218-sfba.hfc.comcastbusiness.net
 274 71-222-225-175.albq.qwest.net
 332 modemcable069.85-178-173.mc.videotron.ca
 335 no-dns-yet.convergencegroup.co.uk
 337 ip-81-210-146-57.unitymediagroup.de
 338 embln.embl.de
 353 cpe-72-183-122-94.austin.res.rr.com
 365 90-227-178-245-no128.tbcn.telia.com
 370 138.194.48.143
 408 210.96-246-81.adsl-static.isp.belgacom.be
 428 k8024-02l.mc.chalmers.se
 490 cpe-70-115-243-212.satx.res.rr.com
 493 www26006u.sakura.ne.jp
 527 219.239.34.54
 534 44.186.34.193.bridgep.com
 535 5a-m02-d2.data-hotel.net
 586 5a-m02-c6.data-hotel.net
 666 168-103-109-30.albq.qwest.net
 676 y236106.dynamic.ppp.asahi-net.or.jp
 698 82-169-211-97.ip.telfort.nl
 759 w-192.cust-7150.ip.static.uno.uk.net
1071 adsl-75-23-68-58.dsl.peoril.sbcglobal.net
1217 hekate.eva.mpg.de
1223 211.103.236.94
1307 li147-78.members.linode.com
1342 static24-72-40-170.r.rev.accesscomm.ca
1346 dinsdale.python.org
1398 145.253.161.126
1771 pat1.orbitz.net
2060 artima.com
3164 148.188.1.60
3767 90-224-169-87-no128.tbcn.telia.com
4518 jervis.textdrive.com
5791 cpe-70-114-252-25.austin.res.rr.com
6171 ip21.biogen.com
8264 it18689.research.novo.dk
10919 61.135.216.104

I know who comes from one of the Max Planck Institute machines, and a big “hello!” to the readers from Novo Nordisk and Biogen – thanks for subscribing to my blog! “dinsdale.python.org” is the Planet Python aggregator, and artima.com is the Artima Developer Community; another aggregator.

I know more than 60 people read my blog posts within 12 hours of when they are posted, so this tells me that most people read blogs through a web-based aggregator (like Planet Python or Google Reader), and not through a program running on their desktop. I’m glad to know I’m not alone in doing that!

转自:http://www.dalkescientific.com/writings/diary/archive/2012/01/19/concurrent.futures.html