Python - ThreadPoolExecutor 的使用

前言

根據 Python 官方文件提到,在使用Python時如果遇到 I/O Bound 的情況,可以考慮用內建的 ThreadPoolExecutor 來解決。

  • If you want your application to make better use of the computational resources of multi-core machines, you are advised to use multiprocessing or concurrent.futures.ProcessPoolExecutor.

  • However, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.

什麼是 I/O Bound ?

下方是 wiki 的解釋

In computer science, I/O bound refers to a condition in which the time it takes to complete a computation is determined principally by the period spent waiting for input/output operations to be completed.

I/O Bound 即為程式運作的時間主要受限於花費在 I/O 行為上。

除了定義外,wiki 也給了一個有趣的結論

This circumstance arises when the rate at which data is requested is slower than the rate it is consumed or, in other words, more time is spent requesting data than processing it.

簡單來說,讀寫的時間佔大部份程式運作的時間的情況就稱為 I/O Bound。而這情況常見於 Python 廣泛的應用之一 爬蟲(Data Collecting)。

Python 雖然速度上不及 C++,在處理一般商業情況的資料其實已經算挺堪用的了,然而從爬蟲程式的運作來看,撈取資料的過程往往是最花時間的,因此 ThreadPoolExecutor 正是適合用來加速解決爬蟲遇到的 I/O Bound。

練習

下方程式碼幾乎來自於這部教學,作者解釋清楚,觀念清晰,推推。

基本設定如下,建立一個名為do_something的函式,模擬程式運作,同時加入print()來確認運行過程,也計算整體花費的時間,等等更下方的範例程式碼就加入在startend的中間。

import concurrent.futures
import time


def do_something(sec=1):
    print(f'Start sleeping {sec}')
    time.sleep(sec)
    return f'Done.. {sec}'


start = time.time()

# ------------------- #
#   write code here   #
# ------------------- #

end = time.time()

print(f'Total: {end - start:.2f} second(s).')

ThreadPoolExecutor & submit()

先來模擬執行 1 次

with concurrent.futures.ThreadPoolExecutor() as executor:
    f1 = executor.submit(do_something)
    print(f1.result())

執行 1 次的結果,花了 1.01 秒,符合預期

Start sleeping.. 1
Done.. 1
Total: 1.01 second(s).

接著來試試執行 2 次

with concurrent.futures.ThreadPoolExecutor as executor:
    f1 = executor.submit(do_something)
    f2 = executor.submit(do_something)
    print(f1.result())
    print(f2.result())

執行 2 次的結果,大約也是花了 1.01 秒

Start sleeping.. 1
Start sleeping.. 1
Done.. 1
Done.. 1
Total: 1.01 second(s).

as_complete()

那如果執行多次呢,先用 list comprehensive 接著再用as_completed()包起來

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = [executor.submit(do_something) for _ in range(10)]
    
    for f in concurrent.futures.as_completed(results):
        print(f.result())

從結果看到,同樣也是 1 秒多

Start sleeping.. 1
Start sleeping.. 1
Start sleeping.. 1
Start sleeping.. 1
Start sleeping.. 1
Start sleeping.. 1
Start sleeping.. 1
Start sleeping.. 1
Start sleeping.. 1
Start sleeping.. 1
Done.. 1
Done.. 1
Done.. 1
Done.. 1
Done.. 1
Done.. 1
Done.. 1
Done.. 1
Done.. 1
Done.. 1
Total: 1.01 second(s).

如果秒數不同呢

with concurrent.futures.ThreadPoolExecutor() as executor:
    secs = [5, 4, 3, 2, 1]
    results = [executor.submit(do_something, sec) for sec in secs]
    
    for f in concurrent.futures.as_completed(results):
        print(f.result())

在前方sleep秒數比較久的程式運行中時,後方就開始跑了,所以 1 秒的會是最快執行完的。

Start sleeping.. 5
Start sleeping.. 4
Start sleeping.. 3
Start sleeping.. 2
Start sleeping.. 1
Done.. 1
Done.. 2
Done.. 3
Done.. 4
Done.. 5
Total: 5.02 second(s).

map()

接著用 map() 來達到更簡潔的寫法,將 iterable 的 object 依序丟入 function 中。

with concurrent.futures.ThreadPoolExecutor() as executor:
    secs = [5, 4, 3, 2, 1]
    results = executor.map(do_something, secs)

同樣也是 5 秒完成。

Start sleeping 5
Start sleeping 4
Start sleeping 3
Start sleeping 2
Start sleeping 1
Total: 5.01 second(s).

實測爬蟲

一般情況

下方的程式碼用於爬取台灣證券交易所的股價資料,整體運作就是取得資料後並存在local,get_stock_price()為爬取用的函式,以下爬取 5 個日期當作範例。

import requests
import pandas as pd
import time

def get_stock_price(date:str):

    url = 'https://www.twse.com.tw/exchangeReport'
    url += f'/MI_INDEX?response=json&date={date}&type=ALL'

    res = requests.get(url)
    source = res.json()

    df = pd.DataFrame(source['data9'], columns=source['fields9'])
    df = df[df['證券代號'].str.len() == 4]
    df.reset_index(drop=True, inplace=True)
    df.to_csv(f'{date}_stock_price.csv')


dates = [
    '20221219',
    '20221220',
    '20221221',
    '20221222',
    '20221223',
]

start = time.time()

for date in dates:
    output = get_stock_price(date)

end = time.time()

print(f'Total: {end - start} second(s).')

大約花了 10.58 秒的時間,接著來看看用 ThreadPoolExecutor 會如何

Total: 10.579519987106323 second(s).

使用 ThreadPoolExecutor

整個程式碼跟上方的大同小異,就是改掉運行爬蟲的部分,從普通的 for loop 替換成 ThreadPoolExecutor 配上範例中的 map()

import requests
import pandas as pd
import time
import concurrent.futures

def get_stock_price(date:str):

    url = 'https://www.twse.com.tw/exchangeReport'
    url += f'/MI_INDEX?response=json&date={date}&type=ALL'

    res = requests.get(url)
    source = res.json()

    df = pd.DataFrame(source['data9'], columns=source['fields9'])
    df = df[df['證券代號'].str.len() == 4]
    df.reset_index(drop=True, inplace=True)
    df.to_csv(f'{date}_stock_price.csv')


dates = [
    '20221219',
    '20221220',
    '20221221',
    '20221222',
    '20221223',
]

start = time.time()

with concurrent.futures.ThreadPoolExecutor() as executor:
    results = executor.map(get_stock_price, dates)

end = time.time()

print(f'Total: {end - start} second(s).')

結果約 6.02 秒,相差約 4.5 秒左右,整體加速的 40%,還滿讚的,工作效率大幅提升。

Total: 6.024327278137207 second(s).

參考資料

https://www.youtube.com/watch?v=IEEhzQoKtQU&ab_channel=CoreySchafer https://en.wikipedia.org/wiki/I/O_bound https://docs.python.org/3/library/threading.html

Tags:
# python
# crawl
# thread