diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5942588 --- /dev/null +++ b/.gitignore @@ -0,0 +1,54 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +share/python-wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# Unit test / coverage reports +.tox/ +.nox/ +.coverage +.coverage.* +.cache +*.cover +*.py,cover +.hypothesis/ +.pytest_cache/ +cover/ +reports/ + +# Sphinx documentation +docs/_build/ + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# mypy +.mypy_cache/ +.dmypy.json +dmypy.json diff --git a/LICENCE b/LICENCE new file mode 100644 index 0000000..7bd1376 --- /dev/null +++ b/LICENCE @@ -0,0 +1,12 @@ +Copyright (C) 2020 by Vladan + +Permission to use, copy, modify, and/or distribute this software for any purpose +with or without fee is hereby granted. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH +REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND +FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT, +INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS +OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER +TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF +THIS SOFTWARE. diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..6c112f5 --- /dev/null +++ b/README.rst @@ -0,0 +1,5 @@ +============================ +Website stats collector demo +============================ + +... diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..19810a1 --- /dev/null +++ b/config.yaml @@ -0,0 +1,12 @@ +kafka_servers: +- "localhost:9992" +kafka_topic: "sample" +sites: +- url: "https://example.com" + regex: "domain" + check_interval: 5 +- url: "https://example.com" + regex: "aaaaaaaaaaaaa" + check_interval: 8 +- url: "https://example.com/404" + check_interval: 13 diff --git a/reports/.keep b/reports/.keep new file mode 100644 index 0000000..e69de29 diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..9c558e3 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +. diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..53785b2 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,26 @@ +[metadata] +name = webstat +summary = Tool for collecting website stats. +description-file = README.rst +author = Vladan Popovic +author-email = vladanovic@gmail.com +classifier = + Environment :: Automation + Operating System :: POSIX :: Linux + Programming Language :: Python :: 3.8 + +[options] +package_dir= + =src +packages=find: + +[options.packages.find] +where=src + +[options.entry_points] +console_scripts = + wstat_collect = webstat.cmd:collect + wstat_consume = webstat.cmd:consume + +[bdist_wheel] +universal = 1 diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..c2c71af --- /dev/null +++ b/setup.py @@ -0,0 +1,18 @@ +""" +Setup file for the web stats collector. +""" +from setuptools import setup # type: ignore + + +setup( + use_scm_version=True, + setup_requires=['setuptools_scm'], + python_requires='>=3.8, <4', + install_requires=[ + 'aiokafka==0.6.0', + 'asyncpg==0.21.0', + 'PyYAML==5.3.1', + 'requests==2.24.0', + ], + include_package_data=True, +) diff --git a/src/webstat/cmd.py b/src/webstat/cmd.py new file mode 100644 index 0000000..7b49000 --- /dev/null +++ b/src/webstat/cmd.py @@ -0,0 +1,34 @@ +""" +A module containing all console script functions. +""" +import asyncio +import yaml + +from webstat.collector import Collector +from webstat.consumer import Consumer + + +def run(Service): + """ + A factory kinda that runs both services in an event loop. + """ + loop = asyncio.get_event_loop() + queue = asyncio.Queue() + with open('config.yaml', 'r') as conf_file: + config = yaml.load(conf_file, Loader=yaml.FullLoader) + tasks = Service(config, loop, queue).tasks() + loop.run_until_complete(asyncio.gather(*tasks)) + + +def collect(): + """ + Main producer event loop. + """ + run(Collector) + + +def consume(): + """ + Main consumer event loop. + """ + run(Consumer) diff --git a/src/webstat/collector.py b/src/webstat/collector.py new file mode 100644 index 0000000..9c7a6c7 --- /dev/null +++ b/src/webstat/collector.py @@ -0,0 +1,86 @@ +""" +Checks status of web servers and sends them to a configured Kafka topic. +""" +import asyncio +import json +import re +from typing import Any, Dict, List, Optional + +import aiokafka # type: ignore +import requests + + +class Collector: + """ + A class that contains all methods needed to check the statuses of all + websites present in the config. + """ + def __init__(self, config: Dict[str, Any], + event_loop: asyncio.AbstractEventLoop, + queue: asyncio.Queue): + self.config = config + self.loop = event_loop + self.queue = queue + + async def get_status(self, url: str, regex: Optional[str]) -> Dict[str, Any]: + """ + Checks the status of a website and optionally matches a regex on the + response body. + + :param url: The URL of the site that needs to be checked. + :param regex: An optional regex to match on the response body. + :returns: A dict ready to be sent to the queue for further processing. + """ + res = await self.loop.run_in_executor(None, requests.get, url) + matches = None # The matches value should be None since the regex can + # be ommited from the config. + if regex is not None: + matches = re.search(regex, res.text) is not None + return { + 'url': url, + 'regex': regex, + 'status': res.status_code, + 'response_time': res.elapsed.microseconds, + 'regex_matches': matches, + } + + async def create_periodic_task(self, site): + """ + A void function that gets the status of a site and sends it to an + ``asyncio.Queue`` for further processing (sending to a Kafka topic). + + :param site: A site object from the config. + """ + while True: + data = await self.get_status(site["url"], site.get("regex")) + self.queue.put_nowait(data) + await asyncio.sleep(site["check_interval"]) + + async def produce(self): + """ + Creates and starts an ``aiokafka.AIOKafkaProducer`` and runs a loop that + reads from the ``queue`` and sends the messages to the topic from the + ``config``. + """ + producer = aiokafka.AIOKafkaProducer( + loop=self.loop, + bootstrap_servers=self.config["kafka_servers"]) + + await producer.start() + try: + while True: + status = await self.queue.get() + msg = bytes(json.dumps(status).encode("utf-8")) + await producer.send_and_wait(self.config["kafka_topic"], msg) + finally: + await producer.stop() + + def tasks(self) -> List[asyncio.Task]: + """ + Creates a task for every site. + """ + def create_task(site) -> asyncio.Task: + return self.loop.create_task(self.create_periodic_task(site)) + tasks = list(map(create_task, self.config["sites"])) + tasks.append(self.loop.create_task(self.produce())) + return tasks diff --git a/src/webstat/consumer.py b/src/webstat/consumer.py new file mode 100644 index 0000000..7043468 --- /dev/null +++ b/src/webstat/consumer.py @@ -0,0 +1,59 @@ +""" +Sample consumer. +""" +import asyncio +import json +from typing import Any, Dict, List + +import aiokafka # type: ignore +import asyncpg # type: ignore + + +class Consumer: + def __init__(self, config: Dict[str, Any], + event_loop: asyncio.AbstractEventLoop, + queue: asyncio.Queue): + self.config = config + self.loop = event_loop + self.queue = queue + + async def consume(self): + """ + Consumes messages from a Kafka topic. + """ + consumer = aiokafka.AIOKafkaConsumer( + self.config['kafka_topic'], + loop=self.loop, + bootstrap_servers=self.config['kafka_servers']) + + await consumer.start() + try: + # Consume messages + async for msg in consumer: + self.queue.put_nowait(json.loads(msg.value)) + finally: + # Will leave consumer group; perform autocommit if enabled. + await consumer.stop() + + + async def save(self, pool, data): + async with pool.acquire() as conn: + async with conn.cursor() as cur: + await cur.execute("SELECT 1") + + async def write(self): + try: + while True: + status = await self.queue.get() + print(status) + finally: + print("EXITED!") + + def tasks(self) -> List[asyncio.Task]: + """ + Creates tasks for reading from the Kafka topic and writing in + PostgreSQL. + """ + kafka_consumer = self.loop.create_task(self.consume()) + psql_writer = self.loop.create_task(self.write()) + return [kafka_consumer, psql_writer] diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..e190c76 --- /dev/null +++ b/tox.ini @@ -0,0 +1,36 @@ +[tox] +envlist = clean,lint,py3,report + +[testenv] +deps = + mock + pytest + pytest-cov + pytest-mock +commands = + pytest --cov=webstat --cov-append --cov-report=term-missing {posargs} + +[testenv:lint] +deps = pylint +whitelist_externals = bash +commands = + bash -c "pylint --output-format=parseable src/ | tee reports/pylint.out" + +[testenv:report] +deps = coverage +skip_install = true +commands = + coverage report + coverage html -d reports/htmlcov + coverage xml -o reports/coverage.xml + +[testenv:docs] +changedir = docs +deps = sphinx +commands = + sphinx-build -W -b html -E ./source/ ./build/ + +[testenv:clean] +deps = coverage +skip_install = true +commands = coverage erase