From 2b9435dddddb3128008b61659947cec24fb7b6a8 Mon Sep 17 00:00:00 2001 From: Vladan Popovic Date: Sun, 6 Sep 2020 00:23:34 +0200 Subject: [PATCH] Add unit tests with 68% coverage, refactor a bit --- docs/source/conf.py | 3 -- src/chweb/__init__.py | 0 src/chweb/cmd.py | 38 +++++++++++++----------- src/chweb/collector.py | 47 +++++++++++++++++------------ tests/__init__.py | 0 tests/conftest.py | 49 ++++++++++++++++++++++++++++++ tests/test_checker.py | 67 ++++++++++++++++++++++++++++++++++++++++++ tests/test_producer.py | 47 +++++++++++++++++++++++++++++ tox.ini | 1 + 9 files changed, 213 insertions(+), 39 deletions(-) create mode 100644 src/chweb/__init__.py create mode 100644 tests/__init__.py create mode 100644 tests/conftest.py create mode 100644 tests/test_checker.py create mode 100644 tests/test_producer.py diff --git a/docs/source/conf.py b/docs/source/conf.py index 4166f67..221e89c 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -15,9 +15,6 @@ import sys import sphinx_typlog_theme -sys.path.insert(0, os.path.join(os.path.abspath('.'), - "..", "..", "src")) - # -- Project information ----------------------------------------------------- project = 'chweb' diff --git a/src/chweb/__init__.py b/src/chweb/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/chweb/cmd.py b/src/chweb/cmd.py index 29724e6..a819299 100644 --- a/src/chweb/cmd.py +++ b/src/chweb/cmd.py @@ -6,7 +6,7 @@ import asyncio import logging import logging.config from logging import Logger -from typing import Tuple +from typing import Any, Dict, Tuple import os import yaml @@ -27,6 +27,15 @@ def configure(name) -> Tuple[Config, Logger]: 'Defaults to /etc/checker.yaml')) args = parser.parse_args() + with open(args.config, 'r') as conf_file: + config = yaml.load(conf_file, Loader=yaml.FullLoader) + logging.config.dictConfig(config['logging']) + + logger = logging.getLogger("chweb.{}".format(name)) + return (config, logger) + + +def create_config(conf: Dict[str, Any]): kafka_servers_env = os.getenv('KAFKA_SERVERS') if kafka_servers_env is not None: kafka_servers = kafka_servers_env.split(',') @@ -39,23 +48,18 @@ def configure(name) -> Tuple[Config, Logger]: pg_user = os.getenv('POSTGRES_USER') pg_pass = os.getenv('POSTGRES_PASS') - with open(args.config, 'r') as conf_file: - config = yaml.load(conf_file, Loader=yaml.FullLoader) - logging.config.dictConfig(config['logging']) + config = Config(**conf) + config.kafka.servers = (kafka_servers if kafka_servers_env + else config.kafka.servers) + config.kafka.topic = kafka_topic or config.kafka.topic + config.postgres.dbhost = pg_host or config.postgres.dbhost + config.postgres.dbname = pg_db or config.postgres.dbname + config.postgres.dbport = (int(pg_port) if pg_port is not None + else config.postgres.dbport) + config.postgres.dbuser = pg_user or config.postgres.dbuser + config.postgres.dbpass = pg_pass or config.postgres.dbpass - config = Config(**config) - config.kafka.servers = (kafka_servers if kafka_servers_env - else config.kafka.servers) - config.kafka.topic = kafka_topic or config.kafka.topic - config.postgres.dbhost = pg_host or config.postgres.dbhost - config.postgres.dbname = pg_db or config.postgres.dbname - config.postgres.dbport = pg_port or config.postgres.dbport - config.postgres.dbuser = pg_user or config.postgres.dbuser - config.postgres.dbpass = pg_pass or config.postgres.dbpass - - logger = logging.getLogger("chweb.{}".format(name)) - print(config) - return (config, logger) + return config def collect(): diff --git a/src/chweb/collector.py b/src/chweb/collector.py index a37b9c6..105ab1a 100644 --- a/src/chweb/collector.py +++ b/src/chweb/collector.py @@ -2,16 +2,17 @@ Checks status of web servers and sends them to a configured Kafka topic. """ import asyncio +import logging import re from typing import Optional from urllib.parse import urlparse -import aiokafka # type: ignore +import aiokafka # type: ignore import requests -from requests import ConnectionError +from requests import exceptions as rqexc from chweb.base import Service -from chweb.models import Check, SiteConfig +from chweb.models import Check, Config, SiteConfig class Collector(Service): @@ -30,8 +31,8 @@ class Collector(Service): ``chweb.collector.Collector.check_forever``. """ res = await self.loop.run_in_executor(None, requests.get, url) - matches = None # The matches value should be None since the regex can - # be ommited from the config. + matches = None # The matches value should be None since the regex can + # be ommited from the config. if regex is not None: matches = re.search(regex, res.text) is not None return Check( @@ -57,10 +58,10 @@ class Collector(Service): while True: try: data = await self.check(site.url, site.regex) - except ConnectionError as exc: + except rqexc.ConnectionError as exc: errmsg = "{}; {}".format(site.url, exc) self.logger.error(errmsg) - break # Break the loop and destroy the Task. + break # Break the loop and destroy the Task. self.queue.put_nowait(data) await asyncio.sleep(site.check_interval) @@ -75,28 +76,36 @@ class Producer(Service): """ Kafka producer. - Reads from the queue that :class:`chweb.collector.Collector` writes in and - sends all messages in a kafka topic. + Reads checks from the queue written by :class:`chweb.collector.Collector` + and sends all messages in a kafka topic. """ - async def produce(self): - """ - Creates and starts an ``aiokafka.AIOKafkaProducer`` and runs a loop - that reads from the queue and sends the messages to the topic from the - config. - """ - producer = aiokafka.AIOKafkaProducer( + + def __init__(self, config: Config, + logger: logging.Logger, + event_loop: asyncio.AbstractEventLoop, + queue: asyncio.Queue): + super().__init__(config, logger, event_loop, queue) + self.producer = aiokafka.AIOKafkaProducer( loop=self.loop, bootstrap_servers=self.config.kafka.servers) - await producer.start() + async def produce(self): + """ + Creates and starts an ``aiokafka.AIOKafkaProducer`` and runs a loop + that reads from the queue and sends the messages to the topic defined + in the config. + """ + await self.producer.start() try: while True: check = await self.queue.get() msg = bytes(check.json().encode("utf-8")) - await producer.send_and_wait(self.config.kafka.topic, msg) + await self.producer.send_and_wait(self.config.kafka.topic, msg) + except Exception as exc: + self.logger.error(exc) finally: self.logger.warning("Kafka producer destroyed!") - await producer.stop() + await self.producer.stop() def __call__(self) -> asyncio.Future: return self.produce() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..5ddc26e --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,49 @@ +import asyncio +import pytest +from chweb.cmd import create_config + +@pytest.fixture() +def config(): + config_dict = { + 'kafka': { + 'servers': ["localhost:9992"], + 'topic': "sample", + }, + 'postgres': { + 'dbhost': "localhost", + 'dbport': 5432, + 'dbname': "chweb", + 'dbuser': "vladan", + 'dbpass': "", + }, + 'sites': [{ + 'url': "https://example.com", + 'regex': "aaaaaaaaaaaaa", + 'check_interval': 8, + }, + ] + } + return create_config(config_dict) + +@pytest.fixture +def config_invalid(): + config_dict = { + 'kafka': { + 'servers': ["localhost:9992"], + 'topic': "sample", + }, + 'postgres': { + 'dbhost': "localhost", + 'dbport': 5432, + 'dbname': "chweb", + 'dbuser': "vladan", + 'dbpass': "", + }, + 'sites': [{ + 'url': "https://dsadakjhkjsahkjh.com", + 'regex': "domain", + 'check_interval': 5, + }, + ] + } + return create_config(config_dict) diff --git a/tests/test_checker.py b/tests/test_checker.py new file mode 100644 index 0000000..13d9792 --- /dev/null +++ b/tests/test_checker.py @@ -0,0 +1,67 @@ +""" +All tests fot the ``chweb.checker`` module. +""" +import asyncio +from mock import Mock + +import pytest +import requests + +from chweb.collector import Collector + + +@pytest.mark.asyncio +async def test_valid_site_200(config, event_loop): + queue = asyncio.Queue() + coll = Collector(config, Mock(), event_loop, queue) + check = await coll.check('https://example.com', None) + assert check.domain == 'example.com' + assert check.regex_matches is None + assert check.status == 200 + assert check.response_time > 0 + + +@pytest.mark.asyncio +async def test_valid_site_404(config, event_loop): + queue = asyncio.Queue() + coll = Collector(config, Mock(), event_loop, queue) + check = await coll.check('https://example.com/404', None) + assert check.domain == 'example.com' + assert check.regex_matches is None + assert check.status == 404 + assert check.response_time > 0 + + +@pytest.mark.asyncio +async def test_invalid_site(config, event_loop): + queue = asyncio.Queue() + coll = Collector(config, Mock(), event_loop, queue) + with pytest.raises(requests.exceptions.ConnectionError): + _ = await coll.check('https://non.existant.domain.noooo', None) + + +@pytest.mark.asyncio +async def test_check_forever_valid(config, event_loop): + """ + The :meth:`chweb.collector.Collector.check_forever` method runs an infinite + loop, so we'll test if it's running for 2s and assume it's ok. + """ + queue = asyncio.Queue() + coll = Collector(config, Mock(), event_loop, queue) + task = event_loop.create_task(coll.check_forever(config.sites[0])) + await asyncio.sleep(2) + assert not task.done() + task.cancel() + + +@pytest.mark.asyncio +async def test_check_forever_invalid(config_invalid, event_loop): + """ + The :meth:`chweb.collector.Collector.check_forever` method cancels the Task + on error, so if we get an invalid site, the task should be done. + """ + queue = asyncio.Queue() + coll = Collector(config_invalid, Mock(), event_loop, queue) + task = event_loop.create_task(coll.check_forever(config_invalid.sites[0])) + await asyncio.sleep(1) + assert task.done() diff --git a/tests/test_producer.py b/tests/test_producer.py new file mode 100644 index 0000000..6f61bec --- /dev/null +++ b/tests/test_producer.py @@ -0,0 +1,47 @@ +import asyncio + +import aiokafka +from mock import Mock +import pytest + +from chweb.collector import Producer +from chweb.models import Check + + +@pytest.mark.asyncio +async def test_producer_called(config, event_loop): + queue = asyncio.Queue() + producer = Producer(config, Mock(), event_loop, queue) + check = Check() + await queue.put(check) + + async def async_patch(): + pass + Mock.__await__ = lambda x: async_patch().__await__() + + producer.producer = Mock() + + task = event_loop.create_task(producer.produce()) + await asyncio.sleep(0) + producer.producer.send_and_wait.assert_called_with( + config.kafka.topic, bytes(check.json().encode('utf-8'))) + task.cancel() + + +@pytest.mark.asyncio +async def test_producer_called_invalid(config, event_loop): + queue = asyncio.Queue() + producer = Producer(config, Mock(), event_loop, queue) + check = Check() + await queue.put('') + + async def async_patch(): + pass + Mock.__await__ = lambda x: async_patch().__await__() + + producer.producer = Mock() + + task = event_loop.create_task(producer.produce()) + await asyncio.sleep(0) + producer.logger.error.assert_called() + assert task.done() diff --git a/tox.ini b/tox.ini index c740a5d..db175d3 100644 --- a/tox.ini +++ b/tox.ini @@ -5,6 +5,7 @@ envlist = clean,lint,py3,report deps = mock pytest + pytest-asyncio pytest-cov pytest-mock commands =