From fff8e6e4cc329630ac3d259d056ef4a513bfed5b Mon Sep 17 00:00:00 2001 From: Vladan Popovic Date: Sat, 5 Sep 2020 00:06:57 +0200 Subject: [PATCH] Refactor once again - Split collector and producer. - Rename Application to Service. - Use __call__() instead of run(). --- src/chweb/base.py | 5 +---- src/chweb/cmd.py | 38 ++++++++++++++++++++------------------ src/chweb/collector.py | 41 ++++++++++++++++++++++++----------------- src/chweb/consumer.py | 35 +++++++++++++---------------------- 4 files changed, 58 insertions(+), 61 deletions(-) diff --git a/src/chweb/base.py b/src/chweb/base.py index f05438d..86d37d8 100644 --- a/src/chweb/base.py +++ b/src/chweb/base.py @@ -7,7 +7,7 @@ import logging from chweb.models import Config -class Application: +class Service: """ A base class for applications / services. """ @@ -19,6 +19,3 @@ class Application: self.logger = logger self.loop = event_loop self.queue = queue - - def run(self): - raise NotImplementedError() diff --git a/src/chweb/cmd.py b/src/chweb/cmd.py index ac470e5..6599b7a 100644 --- a/src/chweb/cmd.py +++ b/src/chweb/cmd.py @@ -9,7 +9,7 @@ from logging import Logger from typing import Tuple import yaml -from chweb.collector import Collector +from chweb.collector import Collector, Producer from chweb.consumer import Consumer from chweb.models import Config @@ -20,7 +20,7 @@ def configure(name) -> Tuple[Config, Logger]: """ parser = argparse.ArgumentParser( description='Website availibility checker.') - parser.add_argument('--config', type=str, + parser.add_argument('-c', '--config', type=str, default="/etc/checker.yaml", help=('The yaml config file. ' 'Defaults to /etc/checker.yaml')) @@ -32,29 +32,31 @@ def configure(name) -> Tuple[Config, Logger]: return (Config(**config), logger) -def run(Service): - """ - Runs a service in an event loop. - """ - loop = asyncio.get_event_loop() - queue = asyncio.Queue() - config, logger = configure(Service.__name__) - logger.info(("Starting service on kafka [cluster]/topic: " - "{}/{}").format(config.kafka.servers, - config.kafka.topic)) - service = Service(config, logger, loop, queue) - service.run() - - def collect(): """ Main producer event loop. """ - run(Collector) + loop = asyncio.get_event_loop() + queue = asyncio.Queue() + + config, logger = configure("collector") + logger.info("Starting collector for %d sites.", len(config.sites)) + collector = Collector(config, logger, loop, queue) + logger.info(("Starting kafka producer on kafka [cluster]/topic: " + "%s/%s"), config.kafka.servers, config.kafka.topic) + producer = Producer(config, logger, loop, queue) + loop.run_until_complete(asyncio.gather(collector(), producer())) def consume(): """ Main consumer event loop. """ - run(Consumer) + loop = asyncio.get_event_loop() + queue = asyncio.Queue() + + config, logger = configure("consumer") + logger.info(("Starting kafka consumer on kafka [cluster]/topic: " + "%s/%s"), config.kafka.servers, config.kafka.topic) + consumer = Consumer(config, logger, loop, queue) + loop.run_until_complete(consumer()) diff --git a/src/chweb/collector.py b/src/chweb/collector.py index c6e88d3..a37b9c6 100644 --- a/src/chweb/collector.py +++ b/src/chweb/collector.py @@ -8,12 +8,13 @@ from urllib.parse import urlparse import aiokafka # type: ignore import requests +from requests import ConnectionError -from chweb.base import Application +from chweb.base import Service from chweb.models import Check, SiteConfig -class Collector(Application): +class Collector(Service): """ A class that contains all methods needed to check the statuses of all websites present in the config. @@ -44,10 +45,10 @@ class Collector(Application): async def check_forever(self, site: SiteConfig): """ - A void function that checks a site and sends the result to an - :class:`asyncio.Queue` for further processing, i.e. sends to a Kafka - topic when it's read from the queue in - :meth:`chweb.collector.Collector.produce` (as in produce data for the + A void function that checks the status of a site and sends the result + in an :class:`asyncio.Queue` for further processing, i.e. the check + info is sent to a Kafka topic in + :meth:`chweb.collector.Producer.produce` (as in produce data for the Kafka consumers, defined in :class:`chweb.consumer.Consumer.consume`. :param site: A :py:class:`chweb.models.SiteConfig` object from the @@ -56,13 +57,27 @@ class Collector(Application): while True: try: data = await self.check(site.url, site.regex) - except Exception as exc: + except ConnectionError as exc: errmsg = "{}; {}".format(site.url, exc) self.logger.error(errmsg) break # Break the loop and destroy the Task. self.queue.put_nowait(data) await asyncio.sleep(site.check_interval) + def __call__(self) -> asyncio.Future: + def create_task(site) -> asyncio.Task: + return self.loop.create_task(self.check_forever(site)) + tasks = map(create_task, self.config.sites) + return asyncio.gather(*tasks) + + +class Producer(Service): + """ + Kafka producer. + + Reads from the queue that :class:`chweb.collector.Collector` writes in and + sends all messages in a kafka topic. + """ async def produce(self): """ Creates and starts an ``aiokafka.AIOKafkaProducer`` and runs a loop @@ -83,13 +98,5 @@ class Collector(Application): self.logger.warning("Kafka producer destroyed!") await producer.stop() - def run(self): - """ - Runs all tasks in the event loop. - """ - def create_task(site) -> asyncio.Task: - return self.loop.create_task(self.check_forever(site)) - tasks = list(map(create_task, self.config.sites)) - tasks.append(self.loop.create_task(self.produce())) - self.loop.run_until_complete(asyncio.gather(*tasks)) - self.logger.info("Checker stopped ...") + def __call__(self) -> asyncio.Future: + return self.produce() diff --git a/src/chweb/consumer.py b/src/chweb/consumer.py index d7f7df6..497304d 100644 --- a/src/chweb/consumer.py +++ b/src/chweb/consumer.py @@ -9,14 +9,14 @@ from typing import Any, Dict import aiokafka # type: ignore import asyncpg # type: ignore -from chweb.base import Application +from chweb.base import Service from chweb.models import Check -class Consumer(Application): +class Consumer(Service): async def consume(self): """ - Consumes messages from a Kafka topic. + Consumes messages from a kafka topic and writes them in the database. """ consumer = aiokafka.AIOKafkaConsumer( self.config.kafka.topic, @@ -25,33 +25,24 @@ class Consumer(Application): await consumer.start() try: - # Consume messages + # Consume messages from the kafka topic. async for msg in consumer: - self.queue.put_nowait(Check(**json.loads(msg.value))) + check_info = Check(**json.loads(msg.value)) + self.queue.put_nowait(check_info) + self.logger.info(check_info) finally: # Will leave consumer group; perform autocommit if enabled. await consumer.stop() + def __call__(self) -> asyncio.Future: + return self.consume() - async def save(self, pool, data): - async with pool.acquire() as conn: - async with conn.cursor() as cur: - await cur.execute("SELECT 1") - async def write(self): +class Db: + async def consume_and_save(self): try: while True: status = await self.queue.get() - print(status) + yield status finally: - print("EXITED!") - - def run(self): - """ - Runs all tasks in the event loop. - """ - tasks = [ - self.loop.create_task(self.consume()), - self.loop.create_task(self.write()), - ] - self.loop.run_until_complete(asyncio.gather(*tasks)) + self.logger.info("Queue reader stopped.")