From 60a944cf5e2d56bde4b26a8383485d7b83eeb94b Mon Sep 17 00:00:00 2001 From: Vladan Popovic Date: Fri, 4 Sep 2020 18:22:49 +0200 Subject: [PATCH] Refactor, rename and fix pydantic errors --- config.yaml | 13 ++++++-- setup.cfg | 8 ++--- src/chweb/cmd.py | 51 +++++++++++++++++++++++++++++ src/{webstat => chweb}/collector.py | 48 ++++++++++++++------------- src/{webstat => chweb}/consumer.py | 23 +++++++------ src/{webstat => chweb}/models.py | 0 src/webstat/cmd.py | 34 ------------------- tox.ini | 2 +- 8 files changed, 105 insertions(+), 74 deletions(-) create mode 100644 src/chweb/cmd.py rename src/{webstat => chweb}/collector.py (64%) rename src/{webstat => chweb}/consumer.py (70%) rename src/{webstat => chweb}/models.py (100%) delete mode 100644 src/webstat/cmd.py diff --git a/config.yaml b/config.yaml index 19810a1..6ba901c 100644 --- a/config.yaml +++ b/config.yaml @@ -1,6 +1,13 @@ -kafka_servers: -- "localhost:9992" -kafka_topic: "sample" +kafka: + servers: + - "localhost:9992" + topic: "sample" +postgres: + dbhost: "localhost" + dbport: 5432 + dbname: "chweb" + dbuser: "vladan" + dbpass: "" sites: - url: "https://example.com" regex: "domain" diff --git a/setup.cfg b/setup.cfg index 53785b2..f25077d 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] -name = webstat -summary = Tool for collecting website stats. +name = chweb +summary = Tool for checking websites and collecting the results. description-file = README.rst author = Vladan Popovic author-email = vladanovic@gmail.com @@ -19,8 +19,8 @@ where=src [options.entry_points] console_scripts = - wstat_collect = webstat.cmd:collect - wstat_consume = webstat.cmd:consume + chweb_collect = chweb.cmd:collect + chweb_consume = chweb.cmd:consume [bdist_wheel] universal = 1 diff --git a/src/chweb/cmd.py b/src/chweb/cmd.py new file mode 100644 index 0000000..7336e6b --- /dev/null +++ b/src/chweb/cmd.py @@ -0,0 +1,51 @@ +""" +A module containing all console script functions. +""" +import argparse +import asyncio +import yaml + +from chweb.collector import Collector +from chweb.consumer import Consumer +from chweb.models import Config + + +def configure() -> Config: + """ + Gets the configuration and creates a Pydantic model from the parsed YAML. + """ + parser = argparse.ArgumentParser( + description='Website availibility checker.') + parser.add_argument('--config', type=str, + default="/etc/checker.yaml", + help=('The yaml config file. ' + 'Defaults to /etc/checker.yaml')) + args = parser.parse_args() + with open(args.config, 'r') as conf_file: + config = yaml.load(conf_file, Loader=yaml.FullLoader) + return Config(**config) + + +def run(Service): + """ + Runs a service in an event loop. + """ + loop = asyncio.get_event_loop() + queue = asyncio.Queue() + config = configure() + service = Service(config, loop, queue) + service.run() + + +def collect(): + """ + Main producer event loop. + """ + run(Collector) + + +def consume(): + """ + Main consumer event loop. + """ + run(Consumer) diff --git a/src/webstat/collector.py b/src/chweb/collector.py similarity index 64% rename from src/webstat/collector.py rename to src/chweb/collector.py index 9c7a6c7..dc8f022 100644 --- a/src/webstat/collector.py +++ b/src/chweb/collector.py @@ -4,25 +4,28 @@ Checks status of web servers and sends them to a configured Kafka topic. import asyncio import json import re -from typing import Any, Dict, List, Optional +from typing import Optional +from urllib.parse import urlparse import aiokafka # type: ignore import requests +from chweb.models import Config, Check + class Collector: """ A class that contains all methods needed to check the statuses of all websites present in the config. """ - def __init__(self, config: Dict[str, Any], + def __init__(self, config: Config, event_loop: asyncio.AbstractEventLoop, queue: asyncio.Queue): self.config = config self.loop = event_loop self.queue = queue - async def get_status(self, url: str, regex: Optional[str]) -> Dict[str, Any]: + async def check(self, url: str, regex: Optional[str]) -> Check: """ Checks the status of a website and optionally matches a regex on the response body. @@ -36,15 +39,16 @@ class Collector: # be ommited from the config. if regex is not None: matches = re.search(regex, res.text) is not None - return { - 'url': url, - 'regex': regex, - 'status': res.status_code, - 'response_time': res.elapsed.microseconds, - 'regex_matches': matches, - } + return Check( + domain=urlparse(res.url).netloc, + regex=regex, + response_time=res.elapsed.microseconds, + regex_matches=matches, + status=res.status_code, + url=res.url, + ) - async def create_periodic_task(self, site): + async def check_forever(self, site): """ A void function that gets the status of a site and sends it to an ``asyncio.Queue`` for further processing (sending to a Kafka topic). @@ -52,9 +56,9 @@ class Collector: :param site: A site object from the config. """ while True: - data = await self.get_status(site["url"], site.get("regex")) + data = await self.check(site.url, site.regex) self.queue.put_nowait(data) - await asyncio.sleep(site["check_interval"]) + await asyncio.sleep(site.check_interval) async def produce(self): """ @@ -64,23 +68,23 @@ class Collector: """ producer = aiokafka.AIOKafkaProducer( loop=self.loop, - bootstrap_servers=self.config["kafka_servers"]) + bootstrap_servers=self.config.kafka.servers) await producer.start() try: while True: - status = await self.queue.get() - msg = bytes(json.dumps(status).encode("utf-8")) - await producer.send_and_wait(self.config["kafka_topic"], msg) + check = await self.queue.get() + msg = bytes(check.json().encode("utf-8")) + await producer.send_and_wait(self.config.kafka.topic, msg) finally: await producer.stop() - def tasks(self) -> List[asyncio.Task]: + def run(self): """ - Creates a task for every site. + Runs all tasks in the event loop. """ def create_task(site) -> asyncio.Task: - return self.loop.create_task(self.create_periodic_task(site)) - tasks = list(map(create_task, self.config["sites"])) + return self.loop.create_task(self.check_forever(site)) + tasks = list(map(create_task, self.config.sites)) tasks.append(self.loop.create_task(self.produce())) - return tasks + self.loop.run_until_complete(asyncio.gather(*tasks)) diff --git a/src/webstat/consumer.py b/src/chweb/consumer.py similarity index 70% rename from src/webstat/consumer.py rename to src/chweb/consumer.py index 7043468..ae14caa 100644 --- a/src/webstat/consumer.py +++ b/src/chweb/consumer.py @@ -3,11 +3,13 @@ Sample consumer. """ import asyncio import json -from typing import Any, Dict, List +from typing import Any, Dict import aiokafka # type: ignore import asyncpg # type: ignore +from chweb.models import Check + class Consumer: def __init__(self, config: Dict[str, Any], @@ -22,15 +24,15 @@ class Consumer: Consumes messages from a Kafka topic. """ consumer = aiokafka.AIOKafkaConsumer( - self.config['kafka_topic'], + self.config.kafka.topic, loop=self.loop, - bootstrap_servers=self.config['kafka_servers']) + bootstrap_servers=self.config.kafka.servers) await consumer.start() try: # Consume messages async for msg in consumer: - self.queue.put_nowait(json.loads(msg.value)) + self.queue.put_nowait(Check(**json.loads(msg.value))) finally: # Will leave consumer group; perform autocommit if enabled. await consumer.stop() @@ -49,11 +51,12 @@ class Consumer: finally: print("EXITED!") - def tasks(self) -> List[asyncio.Task]: + def run(self): """ - Creates tasks for reading from the Kafka topic and writing in - PostgreSQL. + Runs all tasks in the event loop. """ - kafka_consumer = self.loop.create_task(self.consume()) - psql_writer = self.loop.create_task(self.write()) - return [kafka_consumer, psql_writer] + tasks = [ + self.loop.create_task(self.consume()), + self.loop.create_task(self.write()), + ] + self.loop.run_until_complete(asyncio.gather(*tasks)) diff --git a/src/webstat/models.py b/src/chweb/models.py similarity index 100% rename from src/webstat/models.py rename to src/chweb/models.py diff --git a/src/webstat/cmd.py b/src/webstat/cmd.py deleted file mode 100644 index 7b49000..0000000 --- a/src/webstat/cmd.py +++ /dev/null @@ -1,34 +0,0 @@ -""" -A module containing all console script functions. -""" -import asyncio -import yaml - -from webstat.collector import Collector -from webstat.consumer import Consumer - - -def run(Service): - """ - A factory kinda that runs both services in an event loop. - """ - loop = asyncio.get_event_loop() - queue = asyncio.Queue() - with open('config.yaml', 'r') as conf_file: - config = yaml.load(conf_file, Loader=yaml.FullLoader) - tasks = Service(config, loop, queue).tasks() - loop.run_until_complete(asyncio.gather(*tasks)) - - -def collect(): - """ - Main producer event loop. - """ - run(Collector) - - -def consume(): - """ - Main consumer event loop. - """ - run(Consumer) diff --git a/tox.ini b/tox.ini index e190c76..a4bcfcc 100644 --- a/tox.ini +++ b/tox.ini @@ -8,7 +8,7 @@ deps = pytest-cov pytest-mock commands = - pytest --cov=webstat --cov-append --cov-report=term-missing {posargs} + pytest --cov=chweb --cov-append --cov-report=term-missing {posargs} [testenv:lint] deps = pylint