Multi thread và multi processing trong python

Posted by Hao Do on August 10, 2022

Làm quen với thread và multi thread

IO bound –> thread (Not parallel)
CPU bound –> multi processing (parallel)
Thread: phù hợp với IO operator (read, write file)
CPU bound: phù hợp với tính toán

Import thư viện trước khi thực hiện code

1
2
3
4
import time
import threading
import concurrent.futures
import multiprocessing

Multi thread

Thực hiện tuần tự (không có thread)

Tổng thời gian chạy là 2s, code này chạy tuần tự.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# IO bound --> thread (Not parallel)
# CPU bound --> multi processing (parallel)
# Thread: phu hop voi IO operator (read, write file)
# CPU bound: phu hop cho tinh toan, xu ly

import time

start1 = time.perf_counter()
start2 = time.time()

def do_something():
    print('sleeping 1 second ...')
    time.sleep(1)
    print('done sleeping')
    
do_something()
do_something()


end1 = time.perf_counter()
end2 = time.time()

print(f'finish {round(end1 - start1, 2)}')
print(f'finish {round(end2 - start2, 2)}')

img

Sử dụng thread với 2 luồng song song

Tổng thời gian chạy là 1s

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
import threading

start = time.perf_counter()

def do_something():
    print('sleeping 1 second...')
    time.sleep(1)
    print('done sleeping')

t1 = threading.Thread(target=do_something)
t2 = threading.Thread(target=do_something)

t1.start()
t2.start()

t1.join()
t2.join()

finish = time.perf_counter()
print(f'Finish {round(finish - start, 2)} seconds')

img

Tạo nhiều luồng chạy song song

Tổng thời gian chạy là 1s với 10 luồng

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
import threading

start = time.perf_counter()

def do_something():
    print('sleeping 1 second....')
    time.sleep(1)
    print('done sleeping')

threads = []
for _ in range(10):
    t = threading.Thread(target = do_something) # Object
    t.start()
    threads.append(t)

for t in threads:
    t.join()
    
end = time.perf_counter()
print(f'finish = {round(end - start, 2)}')

img

Cách phổ biến để tạo luồng (dùng concurrent.futures)

sử dụng concurrent.futures để tạo và thực hiện luồng

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import time
import threading
import concurrent.futures

start = time.perf_counter()

def do_something(t1, t2):
    seconds = t1 + t2
    print(f'sleep {seconds} s')
    time.sleep(seconds)
    print('done sleep')

with concurrent.futures.ThreadPoolExecutor() as ex:
    # submit to schedule function
    f1 = ex.submit(do_something, 1, 0.5)
    f2 = ex.submit(do_something, 1, 0.5)
    
    print(f1.result())
    print(f2.result())
'''
threads = []
for _ in range(10):
    t = threading.Thread(target=do_something, args = [1, 0.5])
    t.start()
    threads.append(t)
for t in threads:
    t.join()
'''


end = time.perf_counter()
print(f'finish = {round(end - start, 2)}')

img

Một cách khác để thực hiện đa luồng với concurrent.futures

cách này dùng khá phổ biến

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
import threading
import concurrent.futures

start = time.perf_counter()

def do_something(t):
    print(f'sleeping {t} second')
    time.sleep(t)
    print('done sleep')

## khuyen khich dung thread theo cach nay
## dung cho viec read, write file 
with concurrent.futures.ThreadPoolExecutor() as ex:
    seconds = [1, 3, 5, 6, 2]
    list_results = ex.map(do_something, seconds)
    for r in list_results:
        print(r)

end = time.perf_counter()
print(f'finish = {round(end - start, 2)}')

img

Cách sử dụng lock trong thread

Cơ chế sử dụng lock trong thread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import threading
import time
#global x
x = 0
def inc():
    global x
    x += 1
def thread_task(lock):
    for _ in range(1000000):
        lock.acquire()
        inc()
        lock.release()
#lock gia tri x, den khi chay het 3 thread no moi nha lai tu dau.
def main_task():
    start = time.perf_counter()
    global x
    x = 0
    lock = threading.Lock()
    t1 = threading.Thread(target=thread_task, args = (lock,))
    t2 = threading.Thread(target=thread_task, args = (lock,))
    t3 = threading.Thread(target=thread_task, args = (lock,))
    
    t1.start()
    t2.start()
    t3.start()
    
    t1.join()
    t2.join()
    t3.join()
    
    end = time.perf_counter()
    print(f'finish = {round(end - start, 2)} second')
if __name__ == '__main__':
    for i in range(10):
        main_task()
        print('iteration {0}: x = {1}'.format(i, x))

img

Find number of CPU

Cách thức để xem số lượng core của CPU

1
2
import psutil
print(psutil.cpu_count())

Multi processing

các cơ chế của multi processing

Không đồng bộ (Asynchronous)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import time
import multiprocessing

start = time.perf_counter()

def do_something():
    print('sleep 1 second')
    time.sleep(1)
    print('done sleep')

if __name__ == '__main__':
    p1 = multiprocessing.Process(target=do_something)
    p2 = multiprocessing.Process(target=do_something)
    
    ## khong dong bo (Asynchronous)
    p1.start()
    p2.start()

    end = time.perf_counter()
    print(f'finish = {round(end - start, 2)}')

img

Đồng bộ (Synchronous)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import time
import multiprocessing

start = time.perf_counter()

def do_something():
    print('sleeping 1 second')
    time.sleep(1)
    print('done sleep')
    
if __name__ == '__main__':
    p1 = multiprocessing.Process(target=do_something)
    p2 = multiprocessing.Process(target=do_something)
    
    p1.start()
    p2.start()
    
    # da dong bo (synchronous)
    p1.join()
    p2.join()
    
    end = time.perf_counter()
    print(f'finish = {round(end - start, 2)}')

img

Sử dụng multi processing với concurrent.futures.ProcessPoolExecutor

Sử dụng concurrent.futures.ProcessPoolExecutor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import time
import multiprocessing
import concurrent.futures

start = time.perf_counter()
def do_something(t):
    print(f'sleeping {t} second')
    time.sleep(t)
    print('done sleep')
if __name__ == '__main__':
    # ProcessPoolExecutor dif ThreadPoolExecutor
    with concurrent.futures.ProcessPoolExecutor() as ex:
        f1 = ex.submit(do_something, 1.5)
        f2 = ex.submit(do_something, 1.3)
        
        print(f1.result())
        print(f1.result())
    '''
    list_process = []
    for _ in range(10):
        p = multiprocessing.Process(target=do_something, args = [1])
        p.start()
        list_process.append(p)
    for p in list_process:
        p.join()
    ''' 
    end = time.perf_counter()
    print(f'finish = {round(end - start, 2)}')

img

Một cách khác để gọi multi processing

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import time
import concurrent.futures
start = time.perf_counter()

def do_something(t):
    print(f'sleep {t} second')
    time.sleep(t)
    print('done sleep')

if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor() as ex:
        seconds = [1, 3, 5, 6, 2]
        results = ex.map(do_something, seconds)
        
        for r in results:
            print(r)

    end = time.perf_counter()
    print(f'finish = {round(end - start, 2)}')

img

Ghi chú

Khi cần dùng để tính toán thì mình dùng multiprocess, còn không thì dùng multithread.

Full ipynb

Tài liệu tham khảo

Machine learning cơ bản

Hết.