aiokafka
Python 开发里使用的异步 kafka 客户端,大多是 aiokafka,因为它是目前 Python 社区里唯一一个直接支持 asyncio 的 kafka 客户端库。
但这个库有个大问题,也就是 [Producer] Performance drop when 'send' is called from multiple Futures #528 这个 issue 里提到的,当多个协程(一般是 Web 服务的接口里)并发的推送数据时,CPU 占用率会变得异常的高。由于需要快速恢复业务,所以我当时很快写好一段代码去打补丁。在接口里仅调用 push_message
,通过一个内存态的队列把消息按照顺序发送出去。
import asyncio
from loguru import logger
from indexpy import HTTPException
from .mq import get_kafka_client
push_queue: asyncio.Queue[
tuple[asyncio.Event, tuple[str, bytes, bytes]]
] = asyncio.Queue(1000)
async def _real_push():
while not push_queue.empty():
event, (topic, value, key) = await push_queue.get()
try:
await get_kafka_client().send_and_wait(topic=topic, value=value, key=key)
except Exception as e:
logger.error(f"Failed to push message to Kafka: {e}")
finally:
event.set()
worker: asyncio.Task | None = None
async def push_message(topic: str, value: bytes, key: bytes) -> None:
"""
Push a message to a Kafka topic.
"""
event = asyncio.Event()
await push_queue.put((event, (topic, value, key)))
global worker
if worker is None:
worker = asyncio.create_task(_real_push())
if worker.done():
worker = asyncio.create_task(_real_push())
await event.wait()
confluent-kafka-python
过了很久之后我终于有时间寻找别的解决方案。confluent-kafka-python 这个 librdkafka 的 Python 包装几乎是唯一选择了。因为看起来还算可靠的 Python kafka 客户端一共就三个,纯 Python 实现的同步客户端 kafka、纯 Python 实现的异步客户端 aiokafka 以及这个库。
由于 librdkafka 是基于后台线程的回调设计,对 Python、Rust 这种拥有无栈协程的语言来说很容易就接入协程代码中。一个拥有线程保活能力的 asyncio 生产者代码如下。由于在这之前我就将性能瓶颈服务使用 Rust 重写,剩下的代码其实对并发要求并不高所以并没有做性能测试,但我所使用的 rdkafka 也是对 librdkafka 的封装,和如下代码的性能差异应当不会太大。
class AIOProducer:
def __init__(self, bootstrap_servers: str, loop=None):
self._loop = loop or asyncio.get_running_loop()
self._producer = confluent_kafka.Producer({
"bootstrap.servers": bootstrap_servers,
})
self._cancelled = False
self._count = 0
self._init_poll_thread()
def _init_poll_thread(self):
if hasattr(self, "_poll_thread") and self._poll_thread.is_alive():
return
self._poll_thread = Thread(target=self._poll_loop, daemon=True)
self._poll_thread.start()
def _poll_loop(self):
while not self._cancelled:
self._producer.poll(0.1)
def close(self):
self._cancelled = True
self._poll_thread.join()
self._producer.flush()
async def produce(self, topic: str, value: bytes | str, key: bytes | str) -> str:
result = self._loop.create_future()
def ack(err, msg):
if err:
self._loop.call_soon_threadsafe(
result.set_exception, confluent_kafka.KafkaException(err)
)
else:
self._loop.call_soon_threadsafe(result.set_result, msg)
self._count += 1
self._producer.produce(topic, value, key, on_delivery=ack)
if self._count >= 256:
self._count = 0
self._init_poll_thread()
return await result