-
Notifications
You must be signed in to change notification settings - Fork 945
Description
Description
Hello! Thank you for the incredible work on confluent-kafka-python and the new confluent_kafka.aio module.
We've recently migrated some of our Kafka workloads to the .aio module to better integrate with our FastAPI (uvicorn) application. However, we've encountered a consistent deadlock issue where await consumer.poll() or await producer.produce() never resolves, appearing to block or starve the asyncio event loop indefinitely.
After investigating the source code on the current master (and v2.13.2 tree), we identified a couple of potential root causes related to how the event loop is acquired and how background tasks are scheduled inside the .aio wrappers. We would love to get your thoughts and recommendations on whether this is a known limitation or if we're using the API incorrectly in an ASGI context.
Observations / Potential Root Causes
1. asyncio.get_event_loop() vs asyncio.get_running_loop()
In src/confluent_kafka/aio/_AIOConsumer.py (Line 41):
loop = asyncio.get_event_loop()
wrap_common_callbacks(loop, consumer_conf)The constructor uses the deprecated asyncio.get_event_loop(). In environment managers like uvicorn/uvloop, calling get_event_loop() outside of a running coroutine (e.g., during startup events or synchronous initializations) might return a different loop instance than the actual running loop (get_running_loop()). This can result in callbacks being attached to an inactive loop and never firing.
Interestingly, AIOProducer correctly uses asyncio.get_running_loop() (Line 49):
self._loop = asyncio.get_running_loop()2. AIOProducer Batch Timeout Task Starvation
In src/confluent_kafka/aio/producer/_AIOProducer.py, produce() doesn't send instantly but batches messages. A background asyncio task (BufferTimeoutManager._monitor_timeout()) handles flushing.
If the FastAPI event loop becomes heavily utilized or blocked (e.g., by the ThreadPoolExecutor from a concurrent AIOConsumer.poll() call competing for the GIL or resources), this create_task timeout monitor might get starved. As a result, the buffer never flushes, and the returned futures from produce() never resolve.
Reproduction Environment
confluent-kafka: 2.13.0 (and confirmed via source mapping to 2.13.2)Python: 3.10+Framework: FastAPI (uvicorn workers)OS: Linux (Docker Macvlan)
Our Current Workaround
We are currently bypassing .aio and using the synchronous Consumer/Producer running in a dedicated threading.Thread, dispatching callbacks back to the main thread via asyncio.get_running_loop().call_soon_threadsafe().
Discussion
- Is the use of
get_event_loop()inAIOConsumerintentional, or would a PR migrating it toget_running_loop()(matchingAIOProducer) be accepted? - Are there known caveats or recommended patterns when using the new
confluent_kafka.aioclasses inside ASGI containers like ASGI/FastAPI, particularly regardingmax_workersandBufferTimeoutManagerstarvation?
Any guidance or confirmation would be greatly appreciated. Thank you again for providing these asynchronous wrappers!