Многонишково програмиране с Python#

План на лекцията:

  • Скалиране от 1 към много

  • Какво е нишка ?

  • Работа с нишки (Threads)

  • Python GIL

  • Threads, round 2

  • “Многопроцесност”

  • asyncio

  • Примери

Скалиране от 1 към много#

Нека се върнем към едни по-прости времена - когато процесорът (да го наречем “v1”) може да изпълнява една-единствена задача. Нека освен този прост процесор, да имаме и набор от задачи - [t1, t2, t3]. Процесорът взима първата задача, изпълнява я, взема втората задача, изпълнява нея, и така докато не мине през всички задачи.

Single core

Нека си представим, че задачата t2 отнема повече време, защото трябва да изчака за някакъв ресурс (например файлова операция). В това време, нашия процесор v1 чака, и не прави никакви изчисления. Това със сигурност не е оптимално.

Какво е конкурентност ?#

Нека да вземем един нов, по-добър процесор (“v2”), който може да разпознае кога даден процес чака за ресурс, и в такава ситуация да даде приоритет на друг процес. В тази ситуация, когато t2 чака за ресурс, процесорът може да пусне t3 по-рано. Когато ресурсът на t2 стане наличен, процесорът ще спре t3, и ще даде време обратно на t2.

Това изпълнение, ни позволява да изпълним задачите си по-бързо - вместо да чакаме t2 да приключи, това време в чакане може да се използва за изпълнението на t3.

Този модел на изпълнение, в който изпълнението на следващата задача не е нужно да изчаква приключването на текущата, се нарича “конкурентно изпълнение”. Важно е да се отбележи, че конкурентното изпълнение не е едновременно (паралелно) изпълнение на задачи - продължаваме да изпълняваме по една задача в даден момент, просто редът им не е детерминиран. Ние не знаем в какъв ред ще се изпълнят задачите и коя от тях ще бъде изпълнена първа.

Какво е паралелизъм ?#

Колкото и оптимално да е раздаването на процесорно време от нашия v2 процесор, той все още не може да прави паралелни изчисления. Затова нека въведем нашия “v3” процесор - той ще е два v2 процесора “залепени” един за друг. Това би означавало, че нашия v3 процесор може да изпълнява две задачи едновременно (паралелно).

Модерните процесори#

Модерните процесори са изградени от няколко ядра (8, 10, 16), които могат да изпълняват различни задачи паралелно. В идеалния свят, ако нашата задача отнема t време на 1 ядро, то на n ядра, би трябвало да отнеме t/n време.

Какво е нишка ?#

Нишката може да бъде разглеждана като най-малката смислена поредица от инструкции, които да бъдат изпълнени. Един процес съдържа една или повече нишки. Нишките могат да комуникират една с друга, с помощта на споделена памет или с т.нар. канали.

Съществуват два вида нишки - нишки на ядрото (kernel threads) и потребителски нишки (user threads). Нишките на ядрото са менажирани от kernel-а - той се грижи за предаването на изпълнението от една нишка на друга. Kernel нишките споделят една и съща памет и файлови ресурси. Всяка нишка обаче има собствен стек, копие на регистрите и специална памет за конкретната нишка.

User нишките (наричани още “зелени” нишки, green threads) се управляват на ниво потребителски процес. Ядрото на ОС не знае за тяхното съществуване. Факта, че те се оправляват от потребителски процеси, прави предаването на контрола между всяка от тях евтина операция (от гледна точка на kernel-а, няма смяна на изпълняващата се нишка, съответно няма context switch). Главния недостатък на user нишките е при изпълнението на блокиращи операции (най-често свързани с IO операции) - при такава операция, вместо да се блокира само една нишка, се блокира целия процес.

Съществуват три модела за работа с нишки, определяни от съотношението на брой kernel threads към user threads

  • 1:1 - в този модел, на всяка нишка създадена от потребителя съотвества на kernel нишка

  • N:1 - в този модел, всяка нишка създадена от потребителя се съпоставя към една kernel нишка

  • M:N - в този модел, M на брой потребителски нишки се съпоставят към N на брой kernel нишки

Работа с нишки (Threads)#

За работа с нишки в Python използваме библиотеката threading. Тя ни предоставя различни инструменти за стартиране, синхронизиране и обща работа с нишки.

Основният клас с който ще работим, е класът Thread. Той има за цел да се погрижи за изпълнението на дадено парче питонски код, в отделна нишка. Можем да зададем кода, който искаме да изпълним по два начин - като Callable обект, подаден на конструктора на Thread, или като създадем собствен клас, който да наследи Thread и да презапишем run метода.

За да стартираме нишка, трябва да извикаме start метода - той има за цел да подготви нова нишка, в която да бъде изпълнено съдържанието на run метода.

from threading import Thread

def hi():
    print("Thread with a Callable in the init")

t1 = Thread(target=hi)
t1.start()

class MyThread(Thread):
    def run(self):
        print("Custom Thread class")

t2 = MyThread()
t2.start()
Thread with a Callable in the init
Custom Thread class

Тук е важно да се отбележи, че когато наследяваме от Thread, не трябва да предефинираме никакви други методи, освен __init__ и run. Можем да добавяме нови методи, но не трябва да променяме поведението на останалите методи в Thread.

from threading import Thread
from time import sleep

def delayed_hi():
    sleep(2)
    print("Overslept a bit... hi")

t1 = Thread(target=delayed_hi)
t1.start()
print("The early bird gets the worm")
The early bird gets the worm
Overslept a bit... hi

В горния пример, изпълнението на програмата продължава напред със следващите редове код. Програмата ни приключва тогава, когато всички нишки за приключили работа.

Ако искаме да изчакаме дадена нишка да приключи преди да продължим с изпълнението на кода, можем да използваме метода join.

from threading import Thread
from time import sleep

def delayed_hi():
    sleep(1)
    print("I'm a thread")

t1 = Thread(target=delayed_hi)
t1.start()

print("The thread has started")
print("I want to wait for it to end")
t1.join()  # Comment this, to see the change
print("The thread has finished")
The thread has started
I want to wait for it to end
I'm a thread
The thread has finished

Методът join спира изпълненето на кода, докато нишката не приключи работата си или не срещне Exception. Възможно е да подадем timeout параметър на join - той индикира колко време да изчакаме нишката да приключи. Важно е да отбележим, че след изтичането на timeout параметъра, нишката може да продължи да работи. С методът is_alive можем да проверим дали дадена нишка все още работи.

from threading import Thread
from time import sleep 

def sleepy_thread():
    print(">Going to sleep")
    sleep(20)
    print(">Did a really good nap")

t1 = Thread(target=sleepy_thread)
t1.start()

print(f"Is the thread working ? {t1.is_alive()}")

print("Let's give it some time")
sleep(2)

print("Time to get up !")
t1.join(timeout=3)

print(f"Is the thread working ? {t1.is_alive()}")
t1.join()

print(f"What about now ? {t1.is_alive()}")
>Going to sleep
Is the thread working ? True
Let's give it some time
Time to get up !
Is the thread working ? True
>Did a really good nap
What about now ? False

Друга важна особеност на Thread, е че ако нашата функция връща някакъв резутлат, той няма да ни бъде върнат, ако функцията се изпълнява през thread.

from threading import Thread

def return_a_value():
    return 5

t1 = Thread(target=return_a_value)

res = t1.start()
print(f"{res=}, {type(res)=}")

res = t1.join()
print(f"{res=}, {type(res)=}")
res=None, type(res)=<class 'NoneType'>
res=None, type(res)=<class 'NoneType'>

Нека разгледаме следния практически пример - имаме следната функция, която изчислява (до някаква степен) безкрайна сума. Като пример, ще вземем следната сума $\( \sum_{k=0}^{\infty} \frac{99^k}{(99(k+1))^{97}} \)$

Можем да напишеш функция, която изчислява част от реда - с начално \( k_{start} \) и крайно \( k_{end} \)

import math

def calculate_partial_sum(start: int, end: int) -> int:
    sum = 0
    for k in range(start, end + 1):
        numerator = (99 ** k)
        denominator = (99 * (k+1)) ** 97
        sum += numerator // denominator
    
    return sum

Нека изчислим първите 24000 члена на този ред

import time

start = time.time()
calculate_partial_sum(0, 24000)
end = time.time()

print(f'The call took {(end-start):.2f} seconds')
The call took 13.06 seconds

Забелязваме, че кода ни отнема ~13 секунди да пресметне първите 24000 члена да реда. В този си вариант, членове на реда не зависят един от друг - това ни позволява да ги пресмятаме на интервали, които можем да пуснем на няколко нишки.

Нека като първи пример пуснем две нишки - едната да изчисли първите 12000 члена, а втората, останалите.

import time
from threading import Thread

t1 = Thread(target=calculate_partial_sum, args=(0, 12000))
t2 = Thread(target=calculate_partial_sum, args=(12001, 24000))

start = time.time()
t1.start()
t2.start()

while t1.is_alive() or t2.is_alive():
    time.sleep(0.1)

end = time.time()
print(f'The 2 threads took {(end-start):.2f} seconds')
The 2 threads took 14.82 seconds

Забелязваме, че на две нишки, кодът ни се изпълни за ~15 секунди - изненада. Нека пробваме на 4 нишки, каква е ситуацията.

import time
from threading import Thread

t1 = Thread(target=calculate_partial_sum, args=(0, 6000))
t2 = Thread(target=calculate_partial_sum, args=(6001, 12000))
t3 = Thread(target=calculate_partial_sum, args=(12001, 18000))
t4 = Thread(target=calculate_partial_sum, args=(18001, 24000))

start = time.time()
t1.start()
t2.start()
t3.start()
t4.start()

while t1.is_alive() or t2.is_alive() or t3.is_alive() or t4.is_alive():
    time.sleep(0.1)

end = time.time()
print(f'The 4 threads took {(end-start):.2f} seconds')
The 4 threads took 21.37 seconds

С 4 нишки, нещата стават още по-бавни… Какво се случва ?

Python GIL#

Една от особенностите на Python, е т.нар. global interpreter lock (GIL) - накратко, само една нишка може да работи с интерпретатора на Python в даден момент. Това е направено с цел, да се реши проблема с промяната на променлива от една нишка, докато друга я достъпва. (По-конкретно, проблемът може да се появи, когато дадена памет трябва да се освободи, след като в едната нишка се стига до порция от кода, в който променливата вече не се използва).

Съществуването на GIL в Python улеснява писането на код - нашия код винаги е thread-safe (няма как две нишки да се борят за един и същ ресурс). Освен това, GIL позволява използването на C библиотеки, които не са предвидени за работа в многонишков режим.

Наличието на GIL в Python обаче означава, че нашия “многонишков” код всъщност не е многонишков - да, ние стартираме няколко нишки, но те се изпълняват конкурентно, а не паралелно. На практика това означава, че при задачи, в които разчитаме единствено на операциите на процесора, кодът ни ще се държи все едно се изпълнява на една нишка.

Това обаче е вярно за задачи, които са изцяло зависими от процесора - както казахме по-горе, понякога нашите задачи трябва да “чакат” за някакъв ресурс (например файл) - тогава интерпретатора ще отнеме контрола на чакащата нишка, и ще го предаде на следвщата нишка чакаща време за изпълнение. На практика това означава, че използването на нишки в Python би донесло подобрение във времето за изпълнение, независимо от GIL.

Ситуации в които това би било вярно, е когато работим с GUI - през повечето време нашата програма няма да прави CPU изчисления, и активната нишка ще е тази, върху която се изпълнява графичния интерфейс. Друг пример, е когато имаме задачи с много IO операции - писане или четене от файлове.

Ако искате да научите повече детайли за GIL, може да разгледате тази статия, или този клип.

Threads, round 2#

Освен начини за създаване на нишки, в threading библиотеката се намират и полезни инструменти за синхронизация между нишки. Синхронизацията между нишки представлява механизми, които осигуряват правилната работа със споделени ресурси между няколко нишки.

Семафори#

Първият обект който ще разгледаме, е семафорът. Семафорът съдържа брояч, който ако е по-голям от 0, позволява на дадена нишка да влезе в т.нар. критична секция (да достъпи споделен ресурс между нишки). С влизането в тази критична секция на една нишка, брояча се намаля с 1. Когато една нишка излезе от критичната секция “връща” семафора (увеличава неговия брояч с 1). Ако друга нишка стигне до тази критична секция, и брояча на семафора е 0, нишката ще трябва да изчака докато брояча е отново по-голям от 0.

В Python, семафора е реализиран в Semaphore класа. Той съдържа два основни метода - acquire и release. acquire метода проверява дали брояча е със стойност по-голяма от 0. Ако е така, намалява брояча с 1 и продължава напред. Ако брояча обаче е със стойност 0, изчаква докато брояча не стане > 1. release метода пък увеличава стойността на брояча с 1.

Ще разгледаме следния пример - имаме две нишки, всяка от която ще пише в общ списък. За да гарантираме, че във всеки даден момент само една нишка ще пише в списъка, ще използваме семафор.

import threading

def add_another_item_to_list(item: int, items: list[int], semaphore: threading.Semaphore):
    print("Waiting to write in the list")

    semaphore.acquire()
    items.append(item)
    semaphore.release()

    print("Wrote successfully in the list")

semaphore = threading.Semaphore(1)  # Стойността подадена в конструктора е началната стойност на брояча на семафора
items = []
t1 = threading.Thread(target=add_another_item_to_list, args=(1, items, semaphore))
t2 = threading.Thread(target=add_another_item_to_list, args=(2, items, semaphore))

t1.start()
t2.start()

print(items)
Waiting to write in the list
Wrote successfully in the list
Waiting to write in the list
Wrote successfully in the list
[1, 2]

Можем да използваме семафори и да подсигурим, че една нишка ще се изпълни преди която и да е друга. Нека този пъти имаме 3 нишки, и искаме да подсигурим, че нишка 1 ще се изпълни винаги преди 2 и 3.

import random
import threading

def add_another_item_to_list(item: int, items: list[int], semaphore: threading.Semaphore, is_first_thread: bool = False):
    print(f"Waiting to write {item} in the list")

    if not is_first_thread:
        semaphore.acquire()

    items.append(item)
    semaphore.release()

    print(f"Successfully wrote {item} in the list")

semaphore = threading.Semaphore(0)  # Стойността подадена в конструктора е началната стойност на брояча на семафора
items = []

t1 = threading.Thread(target=add_another_item_to_list, args=(1, items, semaphore, True))
t2 = threading.Thread(target=add_another_item_to_list, args=(2, items, semaphore))
t3 = threading.Thread(target=add_another_item_to_list, args=(3, items, semaphore))

threads = [t1, t2, t3]
random.shuffle(threads)

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()
    
print(items)
Waiting to write 3 in the list
Waiting to write 2 in the list
Waiting to write 1 in the list
Successfully wrote 1 in the list
Successfully wrote 3 in the list
Successfully wrote 2 in the list
[1, 3, 2]

Event#

Нека разгледаме следната ситуация - имаме N на брой нишки, като първата от тях трябва да изпълни някакъв код, преди да могат да продължат другите. За цел синхронизация, трябва да имаме механизъм, чрез който всичко освен първата нишка да изчакат, докато първата нишка не каже че е приключила.

Един начин по който можем да имплементираме това е със специален флаг - докато флага има стойност False, нашите нишки няма да се изпълняват. Когато флага стане True, имаме зелена светлина за изпълнението на останалите нишки.

Python ни предлага удобен механизъм за решаването на този проблем - Event класа. Той разполага със следните методи:

  • is_set() - проверява дали флага е със стойност True

  • set() - задава стойност на флага True

  • clear() - изчиства стойноста на флага (задава се стойност False)

  • wait() - блокира текущата нишка, докато флага не стане True

Нека разгледаме примера, в който една нишка ще направи някакви изчисления, ще ги запази в променлива, а друга нишка ще използват тази стойност.

import threading
import time

constant = 0

def calculate_variable(event_signalizer: threading.Event):
    print("Calculating the variable...")
    time.sleep(1)

    global constant
    constant = 42
    print("Done !")
    event_signalizer.set()

def other_calculations(event_signalizer: threading.Event):
    print("Starting other calculations...")
    event_signalizer.wait()
    result = constant ** 2
    print(f"The result is {result}")

event = threading.Event()
t1 = threading.Thread(target=calculate_variable, args=(event,))
t2 = threading.Thread(target=other_calculations, args=(event, ))

t1.start()
t2.start()

t1.join()
t2.join()
Calculating the variable...
Starting other calculations...
Done !
The result is 1764

“Многопроцесност”#

https://docs.python.org/3.10/library/multiprocessing.html#module-multiprocessing

Един начин да заобиколим GIL-а, е да използваме множество процеси - така всяка задача има собствен Python интерпретатор (съответно и собствен GIL). multiprocessing библиотеката предоставя подобни на threading функции, чрез които можем да реализираме истинска паралелност.

Process#

Можем да стартираме нов процес, като инстанцираме обект от тип Process - той работи по същия начин както Thread класа.

from multiprocessing import Process

def hello_from_the_other_side(name: str):
    print(f"Hello, {name}")

p1 = Process(target=hello_from_the_other_side, args=("Lyubo", ))
p1.start()
p1.join()
Hello, Lyubo

Нека повторим примера с пресмятането на реда от по-горе, но този път ще използваме Process обекти, вместо Thread такива.

import math

def calculate_partial_sum(start: int, end: int) -> int:
    sum = 0
    for k in range(start, end + 1):
        numerator = (99 ** k)
        denominator = (99 * (k+1)) ** 97
        sum += numerator // denominator
    
    return sum
import time

start = time.time()
calculate_partial_sum(0, 24000)
end = time.time()

print(f'The call took {(end-start):.2f} seconds')
The call took 12.72 seconds
import time
from multiprocessing import Process

t1 = Process(target=calculate_partial_sum, args=(0, 12000))
t2 = Process(target=calculate_partial_sum, args=(12001, 24000))

start = time.time()
t1.start()
t2.start()

while t1.is_alive() or t2.is_alive():
    time.sleep(0.1)

end = time.time()
print(f'The 2 processes took {(end-start):.2f} seconds')
The 2 processes took 10.52 seconds
import time
from multiprocessing import Process

t1 = Process(target=calculate_partial_sum, args=(0, 6000))
t2 = Process(target=calculate_partial_sum, args=(6001, 12000))
t3 = Process(target=calculate_partial_sum, args=(12001, 18000))
t4 = Process(target=calculate_partial_sum, args=(18001, 24000))

start = time.time()
t1.start()
t2.start()
t3.start()
t4.start()

while t1.is_alive() or t2.is_alive() or t3.is_alive() or t4.is_alive():
    time.sleep(0.1)

end = time.time()
print(f'The 4 processes took {(end-start):.2f} seconds')
The 4 processes took 6.62 seconds

Макар и не с много, виждаме подобрение във времето при скалирането от 1 процес на 2 процеса, и от 2 на 4 - това забавяне идва от предимно от баланса на задачите - втория процес има по-сложна задача, затова му отнема повече време.

Комуникация между процеси#

Имаме няколко варианта за комуникация между процесите - Queue, pipe, споделена памет и т.нар. “сървърен процес”

Queue#

Можем да създадем опашка, която да бъде споделена между процеси - тя е синхронизирана, т.е. няма риск от race conditions.

import random
from multiprocessing import Process, Queue

def put_in_the_queue(q):
    q.put(random.randint(0, 10))

q = Queue()
p1 = Process(target=put_in_the_queue, args=(q, ))
p2 = Process(target=put_in_the_queue, args=(q, ))

p1.start()
p2.start()

p1.join()
p2.join()

print(f'Amount of items in the Queue: {q.qsize()}')

while q.qsize() > 0:
    print(f'Item in the Queue: {q.get()}')
Amount of items in the Queue: 2
Item in the Queue: 10
Item in the Queue: 4

Pipe#

Вместо опашка, която е обща за всички процеси/нишки, можем да използваме “тръба” (пайп), свързана към два процеса/нишки. Тя позволява двупосочна комуникация между процеси. Извикването на Pipe ни връща два обекта - пайп през който можем да изпращаме данни, и пайп през който можем да получаваме данни. С подаване на duplex аргумента със стойност True, двата пайпа получават възможността за четене и писане. Чрез методите send и recv може да изпращаме и получаваме съобщения по Pipe-а. recv блокира процеса/нишката, докато не се получи съобщение, което да се обработи.

import time
from multiprocessing import Process, Pipe, connection

def wait_and_send(pipe: connection.Connection):
    print("Sending a message")
    time.sleep(1)
    pipe.send(42)

def receive_and_print(pipe: connection.Connection):
    msg = pipe.recv()
    print(f"Received a message: {msg}")

send_pipe, receive_pipe = Pipe()
p1 = Process(target=wait_and_send, args=(send_pipe, ))
p2 = Process(target=receive_and_print, args=(receive_pipe, ))

p1.start()
p2.start()

p1.join()
p2.join()
Sending a message
Received a message: 42

Shared memory#

Ако пък искаме да използваме директно споделена памет под някакъв вид (променлива или масив), можем да заделим така с помощта на класовете Value и Array.

from multiprocessing import Process, Value

def calculate_area(radius: float, pi: Value, result: Value):
    result.value = pi.value * (radius ** 2)

pi = Value('f', 3.14)
result = Value('f')

p1 = Process(target=calculate_area, args=(3, pi, result))
p1.start()
p1.join()

print(f'The result is {result.value}')
The result is 28.260000228881836
import random
from multiprocessing import Process, Array, Value

def add_to_array(arr: Array, free_index: Value):
    arr[free_index.value] = random.randint(0, 100)
    free_index.value = free_index.value + 1

free_index = Value('i', 0)
arr = Array('i', 5)

p1 = Process(target=add_to_array, args=(arr, free_index))
p2 = Process(target=add_to_array, args=(arr, free_index))
p3 = Process(target=add_to_array, args=(arr, free_index))
p4 = Process(target=add_to_array, args=(arr, free_index))
p5 = Process(target=add_to_array, args=(arr, free_index))

procs = [p1, p2, p3, p4, p5]

for proc in procs:
    proc.start()

for proc in procs:
    proc.join()

print(f'The array contains {len(arr)} items')

for i in arr:
    print(i)
The array contains 5 items
3
25
82
51
96

Асинхронно програмиране с asyncio#

Какво е “асинхронно програмиране”?#

Асинхронното програмиране е парадигма, която има имплементация във все повече и повече езици за програмиране (напр. C#, Swift, Javascript, …).

Идеята е проста: докато чакаме приключването на някаква I/O операция (т.е. такава, която не е CPU-bound) да дадем възможност на други задачи да се изпълняват. Това може да се управлява от т.нар. “run loop”, който да върви на една единствена нишка.

Аналогия за асинхронност от реалния свят може да бъде например една дюнерджийница. В този пример IO-bound операцията е стоплянето на храната на скарата - докато това се случва, дюнерджията (нишката) може да се заеме с други задачи - да приеме поръчки, плащания, да приготвя други поръчки и т.н. Без тази асинхронност 5 поръчки щяха да отнемат 5 пъти повече време (като игнорираме добавения overhead от плащане, слагане на салати и т.н.).

Какво са примери за IO операции и за CPU-bound операции?#

  • IO операции:

    • Четене и писане на файлове (в т.ч. бази данни)

    • Мрежови заявки

    • Изпълнение на команди към операционната система

    • всичко друго, което не ангажира процесора през цялото време

  • CPU-bound операции:

    • Математически изчисления

    • Сортиране на списъци

    • Джуркане на матрици

    • всичко друго, което ангажира процесора през цялото време

Асинхронното програмиране няма как да забърза CPU-bound операциите. За тях са необходими многонишкови решения, или конкретно в Python заради GIL - единственото решение е използването на много паралелни процеси (multiprocessing).

Какви са съставните части на асинхронното програмиране?#

  • run loop: това е “двигателят” на асинхронното програмиране. Той следи за събития (напр. приключване на IO операция) и изпълнява кода, който е свързан с тях.

  • корутини (coroutines), Task-ове и Future-и: с тях дефинираме “задачите”, които се изпълняват асинхронно:

    • корутините са функции, които могат да бъдат спрени и продължени по време на изпълнение (подобно на генераторите в Python, но с известна разлика)

    • Task-овете са обектите, които съдържат корутините и се изпълняват от run loop-а

    • Future-ите е общият интерфейс за асинхронните операции

  • ключовите думи async и await:

    • с async декларираме, че дадена функция е корутина (в Python също служи и за деклариране на асинхронни цикли и контекстни мениджъри)

    • с await предаваме контрола на run loop-а, когато стигнем до IO операция. Изполва се само в async среда.

asyncio#

https://docs.python.org/3.10/library/asyncio.html#module-asyncio

Асинхронното програмиране в Python е възможно във версии >= 3.4 посредством вградения модул asyncio и ключовите думи async и await.

⚠️ Имайте предвид, че (особено между версии 3.4 и 3.7) има много промени в синтаксиса и функционалностите на asyncio, така че не вярвайте сляпо на ChatGPT или StackOverflow. Ако пише нещо от рода на @asyncio.couroutine или yield from ... - това не важи вече. Също има и промени в имената на функции и т.н. Винаги double-check-вайте в официалната документация, ако не сте сигурни за нещо.

Корутини#

Теоретически най-коректната дефиниця на корутина е, че тя е генератор, който освен да генерира стойности, може и да получава стойности.

Няма да навливаме в повече детайли от по-ниско ниво. За целите на asyncio и асинхронното програмиране можем да разглеждаме в Python корутините като функции или генератори (понеже е възможно да има и yield в тялото им), декларирани с async def, които могат да предават контрола на run loop-a (за изпълнение на други future-и докато се чака) чрез await.

Нека си направим един прост coroutine, който просто да чака изпълнението на някаква IO операция между два print-а.

За простота ще използваме asyncio.sleep, но можете да лесно да си представете, че на негово място примерно чете огромен файл или чака отговор от сървър например.

import asyncio  # нужно ни е за фунциите вътре, иначе ако само пишем async и await ключови думи няма нужда от импорт
async def some_coroutine():
    print("Starting the operation")
    await asyncio.sleep(5)
    print("Operation finished")

⚠️ Не използвайте time.sleep в корутини! Това ще блокира целия run loop и ще забави изпълнението на всички останали корутина. Винаги използвайте asyncio.sleep за асинхронно чакане.

Хубаво, пише async и await, но каква е разликата с това да ги няма?

import time

def some_function():
    print("Starting the operation")
    time.sleep(5)
    print("Operation finished")

Ако просто изчакваме корутините подред, няма да има разлика във времето за изпълнение:

await some_coroutine()
await some_coroutine()

# Забележка: ако това го напишем като top-level code във файл, няма да можем да го изпълним просто така.
# Тогава ще трябва да го сложим в друг coroutine (async def) и да го извикаме с `asyncio.run(името_на_новия_coroutine())`

# Jupyter notebooks обаче ни позволяваt имплицитна top-level async среда.
Starting the operation
Operation finished
Starting the operation
Operation finished
some_function()
some_function()
Starting the operation
Operation finished
Starting the operation
Operation finished

И в двата случая отнема 10 секунди, защото всеки coroutine чака 5 секунди, и ги извикваме един след друг.

asyncio.gather#

Ако искаме да ги пуснем едновременно, един начин е чрез asyncio.gather:

await asyncio.gather(some_coroutine(), some_coroutine())
Starting the operation
Starting the operation
Operation finished
Operation finished
[None, None]

asyncio.gather обединява множество future-и в един и накрая връща списък с резултатите им (в реда, в който са подадени).

⚠️ Обърнете внимание, че подаваме не просто имената на корутините, а ги извикваме с (). Това е защото един async def на практика е coroutine function, и чак като бъде извикан, тогава връща самия coroutine object. Терминологията обаче обикновено се смесва и двете понятия се наричат “coroutine”.

Чак след указване на await върху coroutine object-a започва изпълнението му (правете аналогия с генераторите и next).

some_coroutine()
<coroutine object some_coroutine at 0x000001CC2C09A680>

Полезна илюстрация за това е coroutine, който приема аргументи:

async def tagged_coroutine(tag: str | int, sleep_secs: int):
    print(f"Starting the operation with tag {tag}, sleeping for {sleep_secs} seconds")
    await asyncio.sleep(sleep_secs)
    print(f"Operation with tag {tag} finished")
    return f"dummy_result_of_tag_{tag}"
coroutines = (tagged_coroutine(tag=i, sleep_secs=5-i) for i in range(5))
await asyncio.gather(*coroutines)
Starting the operation with tag 0, sleeping for 5 seconds
Starting the operation with tag 1, sleeping for 4 seconds
Starting the operation with tag 2, sleeping for 3 seconds
Starting the operation with tag 3, sleeping for 2 seconds
Starting the operation with tag 4, sleeping for 1 seconds
Operation with tag 4 finished
Operation with tag 3 finished
Operation with tag 2 finished
Operation with tag 1 finished
Operation with tag 0 finished
['dummy_result_of_tag_0',
 'dummy_result_of_tag_1',
 'dummy_result_of_tag_2',
 'dummy_result_of_tag_3',
 'dummy_result_of_tag_4']

Обърнете внимание, че ако сложим await в comprehension, няма да има concurrency:

coroutines = [tagged_coroutine(tag=i, sleep_secs=3-i) for i in range(3)]
results = [await result for result in coroutines]
results
Starting the operation with tag 0, sleeping for 3 seconds
Operation with tag 0 finished
Starting the operation with tag 1, sleeping for 2 seconds
Operation with tag 1 finished
Starting the operation with tag 2, sleeping for 1 seconds
Operation with tag 2 finished
['dummy_result_of_tag_0', 'dummy_result_of_tag_1', 'dummy_result_of_tag_2']

async for и асинхронни генератори#

Concurrency няма да има и с итериране на асинхронни генератори. Тях итерираме с async for. Можем да ги създаваме по тва начина:

  • функционален: с async def, в койto има yield

  • ООП: с дефиниране на __aiter__ и __anext__ дъндърите

async def async_generator(n: int):
    for i in range(n):
        result = await tagged_coroutine(tag=i, sleep_secs=n-i)
        yield result

resluts = [result async for result in async_generator(3)]
print(resluts)
Starting the operation with tag 0, sleeping for 3 seconds
Operation with tag 0 finished
Starting the operation with tag 1, sleeping for 2 seconds
Operation with tag 1 finished
Starting the operation with tag 2, sleeping for 1 seconds
Operation with tag 2 finished
['dummy_result_of_tag_0', 'dummy_result_of_tag_1', 'dummy_result_of_tag_2']

asyncio.as_completed#

Ако искаме да обработваме резултатите от future-и в реда на приключването им, можем да използваме asyncio.as_completed. Връща ни Future, който можем да await-нем на всяка итерация:

from asyncio import as_completed

coroutines = [tagged_coroutine(tag=i, sleep_secs=5-i) for i in range(5)]
results = [await result for result in as_completed(coroutines)]

results
Starting the operation with tag 1, sleeping for 4 seconds
Starting the operation with tag 0, sleeping for 5 seconds
Starting the operation with tag 2, sleeping for 3 seconds
Starting the operation with tag 3, sleeping for 2 seconds
Starting the operation with tag 4, sleeping for 1 seconds
Operation with tag 4 finished
Operation with tag 3 finished
Operation with tag 2 finished
Operation with tag 1 finished
Operation with tag 0 finished
['dummy_result_of_tag_4',
 'dummy_result_of_tag_3',
 'dummy_result_of_tag_2',
 'dummy_result_of_tag_1',
 'dummy_result_of_tag_0']

asyncio.create_task#

Ако не ни трябва да ги изчакваме в текущия coroutine, можем да ги schedule-нем с create_task.

⚠️ При този начин е важно полученият Task обект да му запазим референция някъде извън текущия scope (например в атрибут на клас или като глобална променлива), защото иначе ще бъде зачистен от garbage collector-a и прекратен.

from asyncio import create_task

tasks = set()

def sync_function():
    print("I'm a sync function.")
    for i in range(5):
        task = create_task(tagged_coroutine(tag=i, sleep_secs=5-i))
        tasks.add(task)
    print("I shall not wait for the tasks to finish. Or to even start.")

sync_function()

# ...

await asyncio.sleep(8)

if len(tasks) > 0:
    print("If you don't await the tasks, at least clean them afterwards, jeez!")
    for task in tasks:
        if not task.done():
            task.cancel()
    tasks = set()
I'm a sync function.
I shall not wait for the tasks to finish. Or to even start.
Starting the operation with tag 0, sleeping for 5 seconds
Starting the operation with tag 1, sleeping for 4 seconds
Starting the operation with tag 2, sleeping for 3 seconds
Starting the operation with tag 3, sleeping for 2 seconds
Starting the operation with tag 4, sleeping for 1 seconds
Operation with tag 4 finished
Operation with tag 3 finished
Operation with tag 2 finished
Operation with tag 1 finished
Operation with tag 0 finished
If you don't await the tasks, at least clean them afterwards, jeez!
tasks = set()

for i in range(5):
    task = create_task(tagged_coroutine(tag=i, sleep_secs=5-i))
    tasks.add(task)

[await task for task in tasks]  # tasks can be awaited too
Starting the operation with tag 0, sleeping for 5 seconds
Starting the operation with tag 1, sleeping for 4 seconds
Starting the operation with tag 2, sleeping for 3 seconds
Starting the operation with tag 3, sleeping for 2 seconds
Starting the operation with tag 4, sleeping for 1 seconds
Operation with tag 4 finished
Operation with tag 3 finished
Operation with tag 2 finished
Operation with tag 1 finished
Operation with tag 0 finished
['dummy_result_of_tag_2',
 'dummy_result_of_tag_4',
 'dummy_result_of_tag_3',
 'dummy_result_of_tag_1',
 'dummy_result_of_tag_0']

Task-овете могат да бъдат спирани с .cancel(). Един task има метод .done(), който връща True, ако задачата е приключила.

Приключването на Task Може да стане по три начина:

  1. Успешно изпълнение

  2. Грешка

  3. Отменяне (чрез cancel отвън или вдигане на CancellationError отвътре)

С .result(), .exception() и .cancelled() можем да вземем резултата, грешката или да проверим дали е била отменена задачата, като ако състоянието е различно от очакваното се вдига подходяща грешка. Повече инфо тук.

def debug_task(task: Task, seconds: int):
    header = f"{seconds} seconds: "

    print(f"{header}{task.done() = }")
    try:
        print(f"{header}{task.result() = }")
    except asyncio.InvalidStateError:
        print(f"{header}task.result() raised asyncio.InvalidStateError")
    except asyncio.CancelledError:
        print(f"{header}task.result() raised asyncio.CancelledError")
    except Exception as e:  
        print(f"{header}task.result() raised {e}")


task = create_task(tagged_coroutine(tag="long coro", sleep_secs=69420))

for second in range(5):
    debug_task(task, second)
    await asyncio.sleep(1)

task.cancel()
debug_task(task, 5)

await asyncio.sleep(1)  # it takes some time to cancel the task!
debug_task(task, 6)
0 seconds: task.done() = False
0 seconds: task.result() raised asyncio.InvalidStateError
Starting the operation with tag long coro, sleeping for 69420 seconds
1 seconds: task.done() = False
1 seconds: task.result() raised asyncio.InvalidStateError
2 seconds: task.done() = False
2 seconds: task.result() raised asyncio.InvalidStateError
3 seconds: task.done() = False
3 seconds: task.result() raised asyncio.InvalidStateError
4 seconds: task.done() = False
4 seconds: task.result() raised asyncio.InvalidStateError
5 seconds: task.done() = False
5 seconds: task.result() raised asyncio.InvalidStateError
6 seconds: task.done() = True
6 seconds: task.result() raised asyncio.CancelledError

Асинхронни контекст мениджъри (async with)#

В много библиотеки, които използват asyncio, се изисква да се използва async with за да се гарантира правилното отваряне и затваряне на ресурси (файлове, сокети, т.н.).

Както с with използваме context manager, така и с async with използваме асинхронен контекстен мениджър, без друга разлика в синтаксиса.

Единият (ООП) начин да си направим наш си асинхронен контекстен мениджър, като дефинираме __aenter__ и __aexit__ методите:

from asyncio import sleep


class SomeResource:
    @classmethod
    async def acquire(cls) -> 'SomeResource':
        print("Acquiring the resource...", end=" ")
        await sleep(1)
        print("OK.")
        return cls()

    async def some_method(self):
        print("Doing something with the resource...", end=" ")
        await sleep(1)
        print("OK.")

    async def release(self):
        print("Releasing the resource...", end=" ")
        await sleep(1)
        print("OK.")
class AsyncContextManager:
    def __init__(self):
        self._resource: SomeResource | None = None
    
    async def __aenter__(self) -> SomeResource:
        if self._resource is not None:
            raise IOError("Resource already acquired.")
    
        res = await SomeResource.acquire()
        self._resource = res
        return res

    async def __aexit__(self, exc_type, exc, tb):  # just like __exit__ it accepts the exception type, exception and traceback, if any.
        if self._resource is not None:
            await self._resource.release()
            self._resource = None


# Usage example:

async with AsyncContextManager() as res:
    await res.some_method()
Acquiring the resource... OK.
Doing something with the resource... OK.
Releasing the resource... OK.

Другият (функционален) начин е чрез contextlib.asynccontextmanager декоратора. С него трябва да декорираме асинхронен генератор, който yield-ва един единствен път. Кодът преди yield-а се изпълнява като __aenter__, а след yield-а - като __aexit__.

from contextlib import asynccontextmanager


@asynccontextmanager
async def async_context_manager():
    res = await SomeResource.acquire()
    try:
        yield res
    finally:
        await res.release()



# Usage example:

async with async_context_manager() as res:
    await res.some_method()
Acquiring the resource... OK.
Doing something with the resource... OK.
Releasing the resource... OK.

Литература за asyncio#

asyncio е обширна тема, която няма да покрием изцяло в курса. Оставяме няколко полезни линка, за тези които искат да навлязат по-надълбоко с asyncio.

Популярни библиотеки, използващи asyncio#

  • aiofiles: за работа с файлове (локални)

  • aiohttp: за мрежови заявки (с интерфейс, подобен на requests)

  • доста Python SDKs (за Azure например) или Python имплементации на API-та (discord.py например)

  • много съществуващи библиотеки за работа с определени бази данни или сървъри (като psycopg, boto3, т.н.) имат асинхронни версии (съотв. aiopg, aioboto3, т.н.). Някои от тях са по-поддържани, други - не толкова, трети - въобще не.

Примери след лекцията#

Пример 1#

Нека имаме функция, която умножава две матрици.

Нека напишем функция, която умножава две матрици паралелно. Нека използваме multiprocessing библиотеката за целта.

import time
def timeit():
    def wrapper(func):
        def inner(*args, **kwargs):
            start = time.time()
            result = func(*args, **kwargs)
            end = time.time()
            print(f'The function took {(end-start):.2f} seconds')
            return result
        return inner
    return wrapper


@timeit()
def multiply_matrix(a: list[list[int]], b: list[list[int]]) -> list[list[int]]:
    size_of_a = (len(a), len(a[0]))
    size_of_b = (len(b), len(b[0]))

    if size_of_a[1] != size_of_b[0]:
        raise ValueError("The matrixes cannot be multiplied")
    
    m, n, p = size_of_a[0], size_of_a[1], size_of_b[1]

    result = [[0 for _ in range(p)] for _ in range(m)]

    for i in range(m):
        for j in range(p):
            result[i][j] = sum(a[i][k] * b[k][j] for k in range(n))

    return result

Решение на пример 1#

from multiprocessing import Array, Process

@timeit()
def multithreaded_multiply_matrix(a: list[list[int]], b: list[list[int]]) -> list[list[int]]:
    size_of_a = (len(a), len(a[0]))
    size_of_b = (len(b), len(b[0]))

    if size_of_a[1] != size_of_b[0]:
        raise ValueError("The matrixes cannot be multiplied")
    
    m, n, p = size_of_a[0], size_of_a[1], size_of_b[1]
    

    result = Array('i', m * p)

    threads = [Process(target=multiply_row, args=(result, n, p, i, a, b)) for i in range(m)]

    for thread in threads:
        thread.start()

    for thread in threads:
        thread.join()

    return [result[i*p:(i+1)*p] for i in range(m)]

def multiply_row(shared_memory: Array, n: int, p: int, i: int, a: list[list[int]], b: list[list[int]]):
    for j in range(p):
        target_index = i * p + j
        shared_memory[target_index] = sum(a[i][k] * b[k][j] for k in range(n))
from random import randint

m, n, p = 400, 400, 400
value_range = (0, 10)

a = [[randint(*value_range) for _ in range(n)] for _ in range(m)]
b = [[randint(*value_range) for _ in range(p)] for _ in range(n)]

res_1 = multiply_matrix(a, b)
res_2 = multithreaded_multiply_matrix(a, b)
The function took 3.87 seconds
The function took 0.99 seconds

Пример 2#

Нека напишем функция parallel_file_search, която приема път до файл, низ по който търсим и брой на нишките, които ще използваме за търсенето. Функцията трябва да запише редовете, които съдържат низа в нов файл.

Файла трябва да се раздели на N части, в които да се търси паралелно.

Казваме, че един ред съдържа търсения низ, ако той се среща в него.

Решение на пример 2#

from multiprocessing import Process, Semaphore

def search_in_lines(lines: list[str], to_search: str,  output_path: str, semaphore: Semaphore):
    for line in lines:
        if to_search in line:
            results.append(line)

@timeit()
def parallel_file_search(input_path: str, to_search: str, n: int, output_path: str) -> list[str]:
    with open(input_path, encoding='utf-8') as input_file_descriptor:
            amount_of_lines = len(input_file_descriptor.readlines())
            
    chunk_size = amount_of_lines // n
    regions = [(i * chunk_size, (i+1) * chunk_size) for i in range(n - 1)] + [((n - 1) * chunk_size, amount_of_lines + 1)]

    procs = [Process(target=search_string_in_file, args=(to_search, input_path, region, output_path)) for region in regions]

    for proc in procs:
        proc.start()

    for proc in procs:
        proc.join()

def search_string_in_file(string: str, file_path: str, region: tuple[int, int], output_file_path: str, semaphore: Semaphore):
        results = []
        start, end = region
        with open(file_path, encoding='utf-8') as input_file_descriptor:
            for line_number, line in enumerate(input_file_descriptor):
                if start <= line_number < end:
                    column = line.find(string)
                    
                    while column != -1:
                        results.append(line.strip())
                        column = line.find(string, column+1)

        semaphore.acquire()

        with open(output_file_path, encoding='utf-8', mode='w+') as output_file_descriptor:
            for result in results:
                output_file_descriptor.write(result + '\n')

        semaphore.release()

Пример 3#

Напишете клас Scheduler, който да приема coroutine function (за улеснение приемете, че тя не приема аргументи), и да започне да го изпълнява на всеки interval секунди, след като бъде извикан метода start. Scheduler-ът да може да бъде спиран от метод stop.

Интерфейс:

from typing import Callable, Coroutine


class Scheduler:
    """Schedule a coroutine to start running periodically with a fixed time interval."""

    def __init__(self, coroutine_func: Callable[[], Coroutine], interval: int):
        ...

    def start(self):
        ...

    def stop(self):
        ...

Примерна употреба:

scheduler = Scheduler(some_coroutine, 10)

scheduler.start()
await asyncio.sleep(30)
scheduler.stop()

Решение на пример 3#

from asyncio import Task, create_task, sleep
from typing import Callable, Coroutine


class Scheduler:
    """Schedule a coroutine to start running periodically with a fixed time interval."""

    def __init__(self, coroutine_func: Callable[[], Coroutine], interval: int):
        self.interval = interval
        self._coroutine_func = coroutine_func
        self._scheduler_task: Task | None = None
        self._coroutine_task: Task | None = None

    def start(self):
        self._scheduler_task = create_task(self._run())

    async def _run(self):
        while True:
            self._running_task = create_task(self._coroutine_func())
            # primitive and error-prone but just for illustration purposes
            await sleep(self.interval)

    def stop(self):
        if self._scheduler_task is not None:
            self._scheduler_task.cancel()
        if self._coroutine_task is not None:
            self._coroutine_task.cancel()


# Usage example:

async def some_coroutine():
    print("Starting the operation")
    await sleep(5)
    print("Operation finished")

scheduler = Scheduler(some_coroutine, 10)

scheduler.start()
await asyncio.sleep(30)
scheduler.stop()
Starting the operation
Operation finished
Starting the operation
Operation finished
Starting the operation
Operation finished