From 3ec541c3f5c5b958bd68c5a80c1524e9fa963bbf Mon Sep 17 00:00:00 2001 From: Vladan Popovic Date: Sun, 6 Sep 2020 00:42:21 +0200 Subject: [PATCH] Fix formatting, log debug kafka calls --- src/chweb/cmd.py | 5 +++-- src/chweb/collector.py | 6 ++++-- src/chweb/consumer.py | 35 +++++++++++++++++------------------ 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/src/chweb/cmd.py b/src/chweb/cmd.py index a819299..8941ce7 100644 --- a/src/chweb/cmd.py +++ b/src/chweb/cmd.py @@ -28,9 +28,10 @@ def configure(name) -> Tuple[Config, Logger]: args = parser.parse_args() with open(args.config, 'r') as conf_file: - config = yaml.load(conf_file, Loader=yaml.FullLoader) - logging.config.dictConfig(config['logging']) + conf = yaml.load(conf_file, Loader=yaml.FullLoader) + logging.config.dictConfig(conf['logging']) + config = create_config(conf) logger = logging.getLogger("chweb.{}".format(name)) return (config, logger) diff --git a/src/chweb/collector.py b/src/chweb/collector.py index 105ab1a..c2b2d20 100644 --- a/src/chweb/collector.py +++ b/src/chweb/collector.py @@ -31,8 +31,9 @@ class Collector(Service): ``chweb.collector.Collector.check_forever``. """ res = await self.loop.run_in_executor(None, requests.get, url) - matches = None # The matches value should be None since the regex can - # be ommited from the config. + # Should be treated as nullable since the regex can be ommited from the + # config. + matches = None if regex is not None: matches = re.search(regex, res.text) is not None return Check( @@ -100,6 +101,7 @@ class Producer(Service): while True: check = await self.queue.get() msg = bytes(check.json().encode("utf-8")) + self.logger.debug(check) await self.producer.send_and_wait(self.config.kafka.topic, msg) except Exception as exc: self.logger.error(exc) diff --git a/src/chweb/consumer.py b/src/chweb/consumer.py index efeb534..5cc8cc2 100644 --- a/src/chweb/consumer.py +++ b/src/chweb/consumer.py @@ -7,19 +7,17 @@ import logging import time from typing import Any, Dict, Optional -import aiokafka # type: ignore -import asyncpg # type: ignore +import aiokafka # type: ignore +import asyncpg # type: ignore from chweb.base import Service -from chweb.models import Check +from chweb.models import Check, PostgresConfig class Consumer(Service): @property def db(self): - return Db(self.loop, self.logger, self.config.postgres.dbuser, - self.config.postgres.dbpass, self.config.postgres.dbhost, - self.config.postgres.dbport, self.config.postgres.dbname) + return Db(self.loop, self.logger, self.config.postgres) async def consume(self): """ @@ -38,6 +36,7 @@ class Consumer(Service): try: check = Check(**json.loads(msg.value)) self.queue.put_nowait(check) + self.logger.debug(check) await db.save(check) except Exception as exc: err = "error processing message %s; failed with %s" @@ -53,24 +52,25 @@ class Db: """ Database operations and handy helpers. """ + def __init__(self, loop: asyncio.AbstractEventLoop, logger: logging.Logger, - user: str, passwd: str, host: str, port: int, dbname: str): + pgconf: PostgresConfig): self.loop = loop self.logger = logger # Do a side effect here since without this there's not any point for # the application to start. Applies for tests as well. self.conn: Optional[asyncpg.Connection] = None - self.user = user - self.passwd = passwd - self.host = host - self.port = port - self.dbname = dbname + self.conf = pgconf async def __aenter__(self): - self.conn = await asyncpg.connect(host=self.host, port=self.port, - user=self.user, password=self.passwd, - database=self.dbname, loop=self.loop, - timeout=60) + self.conn = await asyncpg.connect( + host=self.conf.dbhost, + port=self.conf.dbport, + user=self.conf.dbuser, + password=self.conf.dbpass, + database=self.conf.dbname, + loop=self.loop, timeout=60, + ) return self async def __aexit__(self, type_, value, traceback): @@ -108,7 +108,6 @@ class Db: Writes a single record in the database. This is not very optimal, a better way would be to write a batch of status checks at once. """ - tstamp = time.mktime(data.request_time.timetuple()) if self.conn is not None: try: await self.conn.execute(''' @@ -120,5 +119,5 @@ class Db: data.request_time, data.response_time, data.status, data.url) except asyncpg.PostgresError as exc: - self.logger.error("error in query %s", exc.query) + self.logger.error("error in query %s", exc) raise