Advanced Asyncio Features

Synchronization Primitives: Locks, Events, Semaphores

Asyncio provides synchronization primitives to manage access to shared resources among coroutines. These include locks, events, and semaphores.

  1. Locks: Ensure that only one coroutine accesses a shared resource at a time.
import asyncio

lock = asyncio.Lock()

async def critical_section():
    async with lock:
        print("Lock acquired")
        await asyncio.sleep(1)
        print("Lock released")

async def main():
    await asyncio.gather(critical_section(), critical_section())
  1. Events: Allow coroutines to wait for an event to occur.
import asyncio

event = asyncio.Event()

async def waiter():
    print("Waiting for event")
    await event.wait()
    print("Event received")

async def main():
    await asyncio.sleep(2)
  1. Semaphores: Limit the number of concurrent accesses to a resource.
import asyncio

semaphore = asyncio.Semaphore(2)

async def access_resource():
    async with semaphore:
        print("Resource accessed")
        await asyncio.sleep(1)

async def main():
    await asyncio.gather(access_resource(), access_resource(), access_resource())

Using Queues for Task Management

Queues in Asyncio provide a way to manage and distribute tasks among coroutines. They are useful for producer-consumer scenarios.

import asyncio

queue = asyncio.Queue()

async def producer():
    for i in range(5):
        await queue.put(i)
        print(f"Produced {i}")
        await asyncio.sleep(1)

async def consumer():
    while True:
        item = await queue.get()
        if item is None:
        print(f"Consumed {item}")
        await asyncio.sleep(2)

async def main():
    producer_task = asyncio.create_task(producer())
    consumer_task = asyncio.create_task(consumer())
    await producer_task
    await queue.put(None)
    await consumer_task

Asyncio and Multiprocessing

Asyncio can be combined with multiprocessing for CPU-bound tasks, allowing for efficient concurrent execution.

import asyncio
import concurrent.futures

def cpu_bound_task(n):
    return sum(i * i for i in range(n))

async def main():
    loop = asyncio.get_running_loop()
    with concurrent.futures.ProcessPoolExecutor() as pool:
        result = await loop.run_in_executor(pool, cpu_bound_task, 10**6)
        print(f"Result: {result}")