From 0189f66cec963650802c7fb1aa63ed3d8a2c1e07 Mon Sep 17 00:00:00 2001 From: Vladan Popovic Date: Sat, 5 Sep 2020 02:38:01 +0200 Subject: [PATCH] Implement database operations --- src/chweb/cmd.py | 28 +++++++++++- src/chweb/consumer.py | 99 +++++++++++++++++++++++++++++++++++++------ 2 files changed, 113 insertions(+), 14 deletions(-) diff --git a/src/chweb/cmd.py b/src/chweb/cmd.py index 6599b7a..29724e6 100644 --- a/src/chweb/cmd.py +++ b/src/chweb/cmd.py @@ -7,6 +7,7 @@ import logging import logging.config from logging import Logger from typing import Tuple +import os import yaml from chweb.collector import Collector, Producer @@ -25,11 +26,36 @@ def configure(name) -> Tuple[Config, Logger]: help=('The yaml config file. ' 'Defaults to /etc/checker.yaml')) args = parser.parse_args() + + kafka_servers_env = os.getenv('KAFKA_SERVERS') + if kafka_servers_env is not None: + kafka_servers = kafka_servers_env.split(',') + + kafka_topic = os.getenv('KAFKA_TOPIC') + + pg_db = os.getenv('POSTGRES_DB') + pg_host = os.getenv('POSTGRES_HOST') + pg_port = os.getenv('POSTGRES_PORT') + pg_user = os.getenv('POSTGRES_USER') + pg_pass = os.getenv('POSTGRES_PASS') + with open(args.config, 'r') as conf_file: config = yaml.load(conf_file, Loader=yaml.FullLoader) logging.config.dictConfig(config['logging']) + + config = Config(**config) + config.kafka.servers = (kafka_servers if kafka_servers_env + else config.kafka.servers) + config.kafka.topic = kafka_topic or config.kafka.topic + config.postgres.dbhost = pg_host or config.postgres.dbhost + config.postgres.dbname = pg_db or config.postgres.dbname + config.postgres.dbport = pg_port or config.postgres.dbport + config.postgres.dbuser = pg_user or config.postgres.dbuser + config.postgres.dbpass = pg_pass or config.postgres.dbpass + logger = logging.getLogger("chweb.{}".format(name)) - return (Config(**config), logger) + print(config) + return (config, logger) def collect(): diff --git a/src/chweb/consumer.py b/src/chweb/consumer.py index 497304d..5a8ca64 100644 --- a/src/chweb/consumer.py +++ b/src/chweb/consumer.py @@ -4,7 +4,8 @@ Sample consumer. import asyncio import json import logging -from typing import Any, Dict +import time +from typing import Any, Dict, Optional import aiokafka # type: ignore import asyncpg # type: ignore @@ -14,6 +15,12 @@ from chweb.models import Check class Consumer(Service): + @property + def db(self): + return Db(self.loop, self.config.postgres.dbuser, + self.config.postgres.dbpass, self.config.postgres.dbhost, + self.config.postgres.dbport, self.config.postgres.dbname) + async def consume(self): """ Consumes messages from a kafka topic and writes them in the database. @@ -25,11 +32,13 @@ class Consumer(Service): await consumer.start() try: - # Consume messages from the kafka topic. - async for msg in consumer: - check_info = Check(**json.loads(msg.value)) - self.queue.put_nowait(check_info) - self.logger.info(check_info) + async with self.db as db: + await db.setup() + # Consume messages from the kafka topic. + async for msg in consumer: + check = Check(**json.loads(msg.value)) + self.queue.put_nowait(check) + await db.save(check) finally: # Will leave consumer group; perform autocommit if enabled. await consumer.stop() @@ -39,10 +48,74 @@ class Consumer(Service): class Db: - async def consume_and_save(self): - try: - while True: - status = await self.queue.get() - yield status - finally: - self.logger.info("Queue reader stopped.") + """ + Database operations and handy helpers. + """ + def __init__(self, loop: asyncio.AbstractEventLoop, user: str, passwd: str, + host: str, port: int, dbname: str): + self.loop = loop + # Do a side effect here since without this there's not any point for + # the application to start. Applies for tests as well. + self.conn: Optional[asyncpg.Connection] = None + self.user = user + self.passwd = passwd + self.host = host + self.port = port + self.dbname = dbname + + async def __aenter__(self): + self.conn = await asyncpg.connect(host=self.host, port=self.port, + user=self.user, password=self.passwd, + database=self.dbname, loop=self.loop, + timeout=60) + return self + + async def __aexit__(self, type_, value, traceback): + await self.conn.close() + + async def setup(self): + """ + Setup the database, i.e. create the table and set up the indexes. + """ + await self.conn.execute(''' + CREATE TABLE IF NOT EXISTS statuses( + id SERIAL PRIMARY KEY, + domain TEXT NOT NULL, + regex TEXT NULL, + regex_matches BOOLEAN NULL, + request_time TIMESTAMP NOT NULL, + response_time INTEGER NOT NULL, + status INTEGER NOT NULL, + url text NOT NULL + ); + CREATE INDEX IF NOT EXISTS + statuses_domain ON statuses(domain); + CREATE INDEX IF NOT EXISTS + statuses_status ON statuses(status); + CREATE INDEX IF NOT EXISTS + statuses_request_time ON statuses(request_time); + CREATE INDEX IF NOT EXISTS + statuses_response_time ON statuses(response_time); + CREATE INDEX IF NOT EXISTS + statuses_regex_matches ON statuses(regex_matches); + ''') + + async def save(self, data: Check): + """ + Writes a single record in the database. This is not very optimal, a + better way would be to write a batch of status checks at once. + """ + tstamp = time.mktime(data.request_time.timetuple()) + if self.conn is not None: + try: + await self.conn.execute(''' + INSERT INTO statuses (domain, regex, regex_matches, + request_time, response_time, + status, url) + VALUES($1, $2, $3, $4, $5, $6, $7) + ''', data.domain, data.regex, data.regex_matches, + data.request_time, data.response_time, data.status, + data.url) + except asyncpg.PostgresError as exc: + logger.error("error in query %s", exc.query) + raise