diff --git a/src/chweb/consumer.py b/src/chweb/consumer.py index 4d8916f..d543620 100644 --- a/src/chweb/consumer.py +++ b/src/chweb/consumer.py @@ -21,24 +21,22 @@ class Consumer(Service): queue: asyncio.Queue): super().__init__(config, logger, event_loop, queue) self.db = Db(self.loop, self.logger, self.config.postgres) + self.consumer = aiokafka.AIOKafkaConsumer( + self.config.kafka.topic, + loop=self.loop, + bootstrap_servers=self.config.kafka.servers) async def consume(self): """ Consumes messages from a kafka topic and writes them in the database. """ - consumer = aiokafka.AIOKafkaConsumer( - self.config.kafka.topic, - loop=self.loop, - bootstrap_servers=self.config.kafka.servers) - - await consumer.start() + await self.consumer.start() try: async with self.db as db: await db.setup() - async for msg in consumer: + async for msg in self.consumer: try: check = Check(**json.loads(msg.value)) - self.queue.put_nowait(check) self.logger.debug(check) await db.save(check) except Exception as exc: @@ -47,7 +45,7 @@ class Consumer(Service): except Exception as exc: self.logger.error(exc) finally: - await consumer.stop() + await self.consumer.stop() def __call__(self) -> asyncio.Future: return self.consume()