Lập trình

Asyncio Shield: Chiếc khiên quyền năng

shield_cat_huongnq.id.vn

Trong lập trình async của Python, asyncio là một tính năng quan trọng giúp xử lý các tác vụ không đồng bộ (asynchronous task) một cách hiệu quả. Trong đó, asyncio.shield() là một tính năng quan trọng, giúp bảo vệ và duy trì một task ngay cả khi các tác vụ khác đang chạy hoặc được hủy bỏ.

asyncio.shield() Là Gì?

asyncio.shield() là một phương thức trong thư viện asyncio của Python, được thiết kế để bảo vệ một task khỏi việc bị hủy khi tác vụ khác yêu cầu hủy hoặc khi chúng ta gọi phương thức cancel() lên một task.

Khi một task được bảo vệ bằng asyncio.shield(), nó sẽ không bị hủy ngay cả khi có yêu cầu hủy từ bên ngoài (interrupt, cancel). Điều này rất hữu ích trong các tình huống mà chúng ta muốn task đó hoàn thành ngay cả khi có thay đổi hoặc yêu cầu hủy bỏ từ phía người dùng hoặc hệ thống.

Trường hợp sử dụng

Một trong những trường hợp sử dụng phổ biến nhất của asyncio.shield() là khi chúng ta muốn bảo vệ một task quan trọng trong khi đang thực hiện các tác vụ không đồng bộ khác. Ví dụ, trong ứng dụng web, khi có một yêu cầu HTTP được gửi đến và một task đang chạy để xử lý yêu cầu này, người dùng có thể yêu cầu hủy bỏ trước khi task kết thúc.

Ở đây, một ví dụ cụ thể là khi sử dụng fastapi để xử lý yêu cầu HTTP và có một task đang chạy mà chúng ta muốn bảo vệ.

from fastapi import FastAPI, HTTPException
import asyncio

app = FastAPI()

async def long_running_task():
    await asyncio.sleep(10)
    return "Task finished"

@app.get("/")
async def root():
    try:
        # Sheild task để task không bị hủy khi request bị hủy
        shielded_task = asyncio.shield(long_running_task())
        result = await shielded_task
        return {"message": result}
    except asyncio.CancelledError:
        raise HTTPException(status_code=499, detail="Client Closed Request")

Trong đoạn code trên, khi có yêu cầu HTTP được gửi đến endpoint /, một task được khởi chạy trong hàm long_running_task(). Bằng cách sử dụng asyncio.shield(), chúng ta bảo vệ task này, giúp nó hoàn thành ngay cả khi có yêu cầu hủy bỏ từ phía client (asyncio.CancelledError). Khi request bị hủy bỏ, server sẽ trả về một HTTPException với error code 499 để thông báo rằng người dùng đã đóng request.

Để hiểu rõ hơn về tầm quan trọng của việc sử dụng asyncio.shield() trong thực tế, chúng ta xem thêm một ví dụ về một ứng dụng SSE stream.

from fastapi import FastAPI, Response
import asyncio

app = FastAPI()

connected_clients = []

async def send_event_to_clients(event):
    for client in connected_clients:
        await client.send(event)

async def save_message_shielded(message):
    # Bảo vệ task lưu message vào cơ sở dữ liệu bằng asyncio.shield()
    shielded_task = asyncio.shield(save_message(message))
    await shielded_task

async def save_message(message):
    # Giả sử đây là hàm lưu message vào cơ sở dữ liệu
    await asyncio.sleep(2)  # Giả định việc lưu vào cơ sở dữ liệu mất 2 giây
    print(f"Message '{message}' saved successfully")

@app.get("/events")
async def events_endpoint(response: Response):
    response.headers["Content-Type"] = "text/event-stream"
    response.headers["Cache-Control"] = "no-cache"
    response.headers["Connection"] = "keep-alive"

    sse = response
    connected_clients.append(sse)

    try:
        message = ''
        # Loop để generate stream data
        while True:
            # Giả sử có một dòng dữ liệu mới từ SSE API
            event_data = "data: New data\n\n"

            message += "New data\n"

            await send_event_to_clients(event_data)
            await asyncio.sleep(1)

    except asyncio.CancelledError as e:
        print("Stream cancelled")

    finally:
        await save_message_shielded(message)
        connected_clients.remove(sse)
        print("Client disconnected")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="127.0.0.1", port=8000)


Khi một client kết nối đến, server bắt đầu trả về một luồng stream cho client, và mỗi giây sẽ gửi một dòng dữ liệu mới tới client.

Khi client ngắt kết nối trong khi stream vẫn đang chạy, có một vấn đề phát sinh. Task đang phục vụ request từ client sẽ bị hủy bởi event loop của FastAPI, điều này dẫn đến việc function lưu message vào database cũng bị hủy. Trong điều kiện thông thường, đoạn message đã được generate không được lưu lại vào database, gây nên việc mất mát thông tin.

Tuy nhiên, trong đoạn code trên, việc sử dụng asyncio.shield() để bảo vệ function lưu message vào database. Điều này đảm bảo rằng ngay cả khi task cha (task phục vụ request từ client) bị hủy, việc lưu message vẫn được thực hiện và không bị ảnh hưởng bởi việc hủy bất kỳ task nào khác trong event loop. Nhờ vào shield, message vẫn được lưu lại một cách an toàn vào database dù có hành động ngắt kết nối từ phía client xảy ra hay không.

Như vậy, asyncio.shield() là một công cụ quan trọng trong việc xử lý các tác asynchronous task trong Python. Việc hiểu cách sử dụng và áp dụng nó trong các trường hợp thực tế như trong fastapi có thể giúp chúng ta xây dựng các ứng dụng ổn định và có hiệu suất cao, đồng thời đảm bảo rằng các tác vụ quan trọng không bị gián đoạn bởi các yêu cầu hủy bỏ không mong muốn.

Leave a Reply

Your email address will not be published. Required fields are marked *