From 8ca5d145bdc2aab6672f80ecb72afa5233c48b4d Mon Sep 17 00:00:00 2001 From: Vladan Popovic Date: Sun, 6 Sep 2020 00:56:52 +0200 Subject: [PATCH] Move the kafka consumer to the object --- src/chweb/consumer.py | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/chweb/consumer.py b/src/chweb/consumer.py index 4d8916f..d543620 100644 --- a/src/chweb/consumer.py +++ b/src/chweb/consumer.py @@ -21,24 +21,22 @@ class Consumer(Service): queue: asyncio.Queue): super().__init__(config, logger, event_loop, queue) self.db = Db(self.loop, self.logger, self.config.postgres) + self.consumer = aiokafka.AIOKafkaConsumer( + self.config.kafka.topic, + loop=self.loop, + bootstrap_servers=self.config.kafka.servers) async def consume(self): """ Consumes messages from a kafka topic and writes them in the database. """ - consumer = aiokafka.AIOKafkaConsumer( - self.config.kafka.topic, - loop=self.loop, - bootstrap_servers=self.config.kafka.servers) - - await consumer.start() + await self.consumer.start() try: async with self.db as db: await db.setup() - async for msg in consumer: + async for msg in self.consumer: try: check = Check(**json.loads(msg.value)) - self.queue.put_nowait(check) self.logger.debug(check) await db.save(check) except Exception as exc: @@ -47,7 +45,7 @@ class Consumer(Service): except Exception as exc: self.logger.error(exc) finally: - await consumer.stop() + await self.consumer.stop() def __call__(self) -> asyncio.Future: return self.consume()