Handle broken messages, add logger to Db
This commit is contained in:
parent
0189f66cec
commit
7967b1d024
1 changed files with 12 additions and 9 deletions
|
@ -17,7 +17,7 @@ from chweb.models import Check
|
||||||
class Consumer(Service):
|
class Consumer(Service):
|
||||||
@property
|
@property
|
||||||
def db(self):
|
def db(self):
|
||||||
return Db(self.loop, self.config.postgres.dbuser,
|
return Db(self.loop, self.logger, self.config.postgres.dbuser,
|
||||||
self.config.postgres.dbpass, self.config.postgres.dbhost,
|
self.config.postgres.dbpass, self.config.postgres.dbhost,
|
||||||
self.config.postgres.dbport, self.config.postgres.dbname)
|
self.config.postgres.dbport, self.config.postgres.dbname)
|
||||||
|
|
||||||
|
@ -34,13 +34,15 @@ class Consumer(Service):
|
||||||
try:
|
try:
|
||||||
async with self.db as db:
|
async with self.db as db:
|
||||||
await db.setup()
|
await db.setup()
|
||||||
# Consume messages from the kafka topic.
|
|
||||||
async for msg in consumer:
|
async for msg in consumer:
|
||||||
|
try:
|
||||||
check = Check(**json.loads(msg.value))
|
check = Check(**json.loads(msg.value))
|
||||||
self.queue.put_nowait(check)
|
self.queue.put_nowait(check)
|
||||||
await db.save(check)
|
await db.save(check)
|
||||||
|
except Exception as exc:
|
||||||
|
err = "error processing message %s; failed with %s"
|
||||||
|
self.logger.error(err, msg, exc)
|
||||||
finally:
|
finally:
|
||||||
# Will leave consumer group; perform autocommit if enabled.
|
|
||||||
await consumer.stop()
|
await consumer.stop()
|
||||||
|
|
||||||
def __call__(self) -> asyncio.Future:
|
def __call__(self) -> asyncio.Future:
|
||||||
|
@ -51,9 +53,10 @@ class Db:
|
||||||
"""
|
"""
|
||||||
Database operations and handy helpers.
|
Database operations and handy helpers.
|
||||||
"""
|
"""
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, user: str, passwd: str,
|
def __init__(self, loop: asyncio.AbstractEventLoop, logger: logging.Logger,
|
||||||
host: str, port: int, dbname: str):
|
user: str, passwd: str, host: str, port: int, dbname: str):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
|
self.logger = logger
|
||||||
# Do a side effect here since without this there's not any point for
|
# Do a side effect here since without this there's not any point for
|
||||||
# the application to start. Applies for tests as well.
|
# the application to start. Applies for tests as well.
|
||||||
self.conn: Optional[asyncpg.Connection] = None
|
self.conn: Optional[asyncpg.Connection] = None
|
||||||
|
@ -117,5 +120,5 @@ class Db:
|
||||||
data.request_time, data.response_time, data.status,
|
data.request_time, data.response_time, data.status,
|
||||||
data.url)
|
data.url)
|
||||||
except asyncpg.PostgresError as exc:
|
except asyncpg.PostgresError as exc:
|
||||||
logger.error("error in query %s", exc.query)
|
self.logger.error("error in query %s", exc.query)
|
||||||
raise
|
raise
|
||||||
|
|
Loading…
Reference in a new issue