Refactor once again

- Split collector and producer.
- Rename Application to Service.
- Use __call__() instead of run().
This commit is contained in:
Vladan Popovic 2020-09-05 00:06:57 +02:00
parent cc4b49a34e
commit fff8e6e4cc
4 changed files with 58 additions and 61 deletions

View File

@ -7,7 +7,7 @@ import logging
from chweb.models import Config
class Application:
class Service:
"""
A base class for applications / services.
"""
@ -19,6 +19,3 @@ class Application:
self.logger = logger
self.loop = event_loop
self.queue = queue
def run(self):
raise NotImplementedError()

View File

@ -9,7 +9,7 @@ from logging import Logger
from typing import Tuple
import yaml
from chweb.collector import Collector
from chweb.collector import Collector, Producer
from chweb.consumer import Consumer
from chweb.models import Config
@ -20,7 +20,7 @@ def configure(name) -> Tuple[Config, Logger]:
"""
parser = argparse.ArgumentParser(
description='Website availibility checker.')
parser.add_argument('--config', type=str,
parser.add_argument('-c', '--config', type=str,
default="/etc/checker.yaml",
help=('The yaml config file. '
'Defaults to /etc/checker.yaml'))
@ -32,29 +32,31 @@ def configure(name) -> Tuple[Config, Logger]:
return (Config(**config), logger)
def run(Service):
"""
Runs a service in an event loop.
"""
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
config, logger = configure(Service.__name__)
logger.info(("Starting service on kafka [cluster]/topic: "
"{}/{}").format(config.kafka.servers,
config.kafka.topic))
service = Service(config, logger, loop, queue)
service.run()
def collect():
"""
Main producer event loop.
"""
run(Collector)
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
config, logger = configure("collector")
logger.info("Starting collector for %d sites.", len(config.sites))
collector = Collector(config, logger, loop, queue)
logger.info(("Starting kafka producer on kafka [cluster]/topic: "
"%s/%s"), config.kafka.servers, config.kafka.topic)
producer = Producer(config, logger, loop, queue)
loop.run_until_complete(asyncio.gather(collector(), producer()))
def consume():
"""
Main consumer event loop.
"""
run(Consumer)
loop = asyncio.get_event_loop()
queue = asyncio.Queue()
config, logger = configure("consumer")
logger.info(("Starting kafka consumer on kafka [cluster]/topic: "
"%s/%s"), config.kafka.servers, config.kafka.topic)
consumer = Consumer(config, logger, loop, queue)
loop.run_until_complete(consumer())

View File

@ -8,12 +8,13 @@ from urllib.parse import urlparse
import aiokafka # type: ignore
import requests
from requests import ConnectionError
from chweb.base import Application
from chweb.base import Service
from chweb.models import Check, SiteConfig
class Collector(Application):
class Collector(Service):
"""
A class that contains all methods needed to check the statuses of all
websites present in the config.
@ -44,10 +45,10 @@ class Collector(Application):
async def check_forever(self, site: SiteConfig):
"""
A void function that checks a site and sends the result to an
:class:`asyncio.Queue` for further processing, i.e. sends to a Kafka
topic when it's read from the queue in
:meth:`chweb.collector.Collector.produce` (as in produce data for the
A void function that checks the status of a site and sends the result
in an :class:`asyncio.Queue` for further processing, i.e. the check
info is sent to a Kafka topic in
:meth:`chweb.collector.Producer.produce` (as in produce data for the
Kafka consumers, defined in :class:`chweb.consumer.Consumer.consume`.
:param site: A :py:class:`chweb.models.SiteConfig` object from the
@ -56,13 +57,27 @@ class Collector(Application):
while True:
try:
data = await self.check(site.url, site.regex)
except Exception as exc:
except ConnectionError as exc:
errmsg = "{}; {}".format(site.url, exc)
self.logger.error(errmsg)
break # Break the loop and destroy the Task.
self.queue.put_nowait(data)
await asyncio.sleep(site.check_interval)
def __call__(self) -> asyncio.Future:
def create_task(site) -> asyncio.Task:
return self.loop.create_task(self.check_forever(site))
tasks = map(create_task, self.config.sites)
return asyncio.gather(*tasks)
class Producer(Service):
"""
Kafka producer.
Reads from the queue that :class:`chweb.collector.Collector` writes in and
sends all messages in a kafka topic.
"""
async def produce(self):
"""
Creates and starts an ``aiokafka.AIOKafkaProducer`` and runs a loop
@ -83,13 +98,5 @@ class Collector(Application):
self.logger.warning("Kafka producer destroyed!")
await producer.stop()
def run(self):
"""
Runs all tasks in the event loop.
"""
def create_task(site) -> asyncio.Task:
return self.loop.create_task(self.check_forever(site))
tasks = list(map(create_task, self.config.sites))
tasks.append(self.loop.create_task(self.produce()))
self.loop.run_until_complete(asyncio.gather(*tasks))
self.logger.info("Checker stopped ...")
def __call__(self) -> asyncio.Future:
return self.produce()

View File

@ -9,14 +9,14 @@ from typing import Any, Dict
import aiokafka # type: ignore
import asyncpg # type: ignore
from chweb.base import Application
from chweb.base import Service
from chweb.models import Check
class Consumer(Application):
class Consumer(Service):
async def consume(self):
"""
Consumes messages from a Kafka topic.
Consumes messages from a kafka topic and writes them in the database.
"""
consumer = aiokafka.AIOKafkaConsumer(
self.config.kafka.topic,
@ -25,33 +25,24 @@ class Consumer(Application):
await consumer.start()
try:
# Consume messages
# Consume messages from the kafka topic.
async for msg in consumer:
self.queue.put_nowait(Check(**json.loads(msg.value)))
check_info = Check(**json.loads(msg.value))
self.queue.put_nowait(check_info)
self.logger.info(check_info)
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
def __call__(self) -> asyncio.Future:
return self.consume()
async def save(self, pool, data):
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 1")
async def write(self):
class Db:
async def consume_and_save(self):
try:
while True:
status = await self.queue.get()
print(status)
yield status
finally:
print("EXITED!")
def run(self):
"""
Runs all tasks in the event loop.
"""
tasks = [
self.loop.create_task(self.consume()),
self.loop.create_task(self.write()),
]
self.loop.run_until_complete(asyncio.gather(*tasks))
self.logger.info("Queue reader stopped.")