diff --git a/src/chweb/consumer.py b/src/chweb/consumer.py index 5a8ca64..efeb534 100644 --- a/src/chweb/consumer.py +++ b/src/chweb/consumer.py @@ -17,7 +17,7 @@ from chweb.models import Check class Consumer(Service): @property def db(self): - return Db(self.loop, self.config.postgres.dbuser, + return Db(self.loop, self.logger, self.config.postgres.dbuser, self.config.postgres.dbpass, self.config.postgres.dbhost, self.config.postgres.dbport, self.config.postgres.dbname) @@ -34,13 +34,15 @@ class Consumer(Service): try: async with self.db as db: await db.setup() - # Consume messages from the kafka topic. async for msg in consumer: - check = Check(**json.loads(msg.value)) - self.queue.put_nowait(check) - await db.save(check) + try: + check = Check(**json.loads(msg.value)) + self.queue.put_nowait(check) + await db.save(check) + except Exception as exc: + err = "error processing message %s; failed with %s" + self.logger.error(err, msg, exc) finally: - # Will leave consumer group; perform autocommit if enabled. await consumer.stop() def __call__(self) -> asyncio.Future: @@ -51,9 +53,10 @@ class Db: """ Database operations and handy helpers. """ - def __init__(self, loop: asyncio.AbstractEventLoop, user: str, passwd: str, - host: str, port: int, dbname: str): + def __init__(self, loop: asyncio.AbstractEventLoop, logger: logging.Logger, + user: str, passwd: str, host: str, port: int, dbname: str): self.loop = loop + self.logger = logger # Do a side effect here since without this there's not any point for # the application to start. Applies for tests as well. self.conn: Optional[asyncpg.Connection] = None @@ -117,5 +120,5 @@ class Db: data.request_time, data.response_time, data.status, data.url) except asyncpg.PostgresError as exc: - logger.error("error in query %s", exc.query) + self.logger.error("error in query %s", exc.query) raise