Add unit tests with 68% coverage, refactor a bit
This commit is contained in:
parent
7967b1d024
commit
2b9435dddd
9 changed files with 213 additions and 39 deletions
0
src/chweb/__init__.py
Normal file
0
src/chweb/__init__.py
Normal file
|
@ -6,7 +6,7 @@ import asyncio
|
|||
import logging
|
||||
import logging.config
|
||||
from logging import Logger
|
||||
from typing import Tuple
|
||||
from typing import Any, Dict, Tuple
|
||||
import os
|
||||
import yaml
|
||||
|
||||
|
@ -27,6 +27,15 @@ def configure(name) -> Tuple[Config, Logger]:
|
|||
'Defaults to /etc/checker.yaml'))
|
||||
args = parser.parse_args()
|
||||
|
||||
with open(args.config, 'r') as conf_file:
|
||||
config = yaml.load(conf_file, Loader=yaml.FullLoader)
|
||||
logging.config.dictConfig(config['logging'])
|
||||
|
||||
logger = logging.getLogger("chweb.{}".format(name))
|
||||
return (config, logger)
|
||||
|
||||
|
||||
def create_config(conf: Dict[str, Any]):
|
||||
kafka_servers_env = os.getenv('KAFKA_SERVERS')
|
||||
if kafka_servers_env is not None:
|
||||
kafka_servers = kafka_servers_env.split(',')
|
||||
|
@ -39,23 +48,18 @@ def configure(name) -> Tuple[Config, Logger]:
|
|||
pg_user = os.getenv('POSTGRES_USER')
|
||||
pg_pass = os.getenv('POSTGRES_PASS')
|
||||
|
||||
with open(args.config, 'r') as conf_file:
|
||||
config = yaml.load(conf_file, Loader=yaml.FullLoader)
|
||||
logging.config.dictConfig(config['logging'])
|
||||
config = Config(**conf)
|
||||
config.kafka.servers = (kafka_servers if kafka_servers_env
|
||||
else config.kafka.servers)
|
||||
config.kafka.topic = kafka_topic or config.kafka.topic
|
||||
config.postgres.dbhost = pg_host or config.postgres.dbhost
|
||||
config.postgres.dbname = pg_db or config.postgres.dbname
|
||||
config.postgres.dbport = (int(pg_port) if pg_port is not None
|
||||
else config.postgres.dbport)
|
||||
config.postgres.dbuser = pg_user or config.postgres.dbuser
|
||||
config.postgres.dbpass = pg_pass or config.postgres.dbpass
|
||||
|
||||
config = Config(**config)
|
||||
config.kafka.servers = (kafka_servers if kafka_servers_env
|
||||
else config.kafka.servers)
|
||||
config.kafka.topic = kafka_topic or config.kafka.topic
|
||||
config.postgres.dbhost = pg_host or config.postgres.dbhost
|
||||
config.postgres.dbname = pg_db or config.postgres.dbname
|
||||
config.postgres.dbport = pg_port or config.postgres.dbport
|
||||
config.postgres.dbuser = pg_user or config.postgres.dbuser
|
||||
config.postgres.dbpass = pg_pass or config.postgres.dbpass
|
||||
|
||||
logger = logging.getLogger("chweb.{}".format(name))
|
||||
print(config)
|
||||
return (config, logger)
|
||||
return config
|
||||
|
||||
|
||||
def collect():
|
||||
|
|
|
@ -2,16 +2,17 @@
|
|||
Checks status of web servers and sends them to a configured Kafka topic.
|
||||
"""
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
from typing import Optional
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import aiokafka # type: ignore
|
||||
import aiokafka # type: ignore
|
||||
import requests
|
||||
from requests import ConnectionError
|
||||
from requests import exceptions as rqexc
|
||||
|
||||
from chweb.base import Service
|
||||
from chweb.models import Check, SiteConfig
|
||||
from chweb.models import Check, Config, SiteConfig
|
||||
|
||||
|
||||
class Collector(Service):
|
||||
|
@ -30,8 +31,8 @@ class Collector(Service):
|
|||
``chweb.collector.Collector.check_forever``.
|
||||
"""
|
||||
res = await self.loop.run_in_executor(None, requests.get, url)
|
||||
matches = None # The matches value should be None since the regex can
|
||||
# be ommited from the config.
|
||||
matches = None # The matches value should be None since the regex can
|
||||
# be ommited from the config.
|
||||
if regex is not None:
|
||||
matches = re.search(regex, res.text) is not None
|
||||
return Check(
|
||||
|
@ -57,10 +58,10 @@ class Collector(Service):
|
|||
while True:
|
||||
try:
|
||||
data = await self.check(site.url, site.regex)
|
||||
except ConnectionError as exc:
|
||||
except rqexc.ConnectionError as exc:
|
||||
errmsg = "{}; {}".format(site.url, exc)
|
||||
self.logger.error(errmsg)
|
||||
break # Break the loop and destroy the Task.
|
||||
break # Break the loop and destroy the Task.
|
||||
self.queue.put_nowait(data)
|
||||
await asyncio.sleep(site.check_interval)
|
||||
|
||||
|
@ -75,28 +76,36 @@ class Producer(Service):
|
|||
"""
|
||||
Kafka producer.
|
||||
|
||||
Reads from the queue that :class:`chweb.collector.Collector` writes in and
|
||||
sends all messages in a kafka topic.
|
||||
Reads checks from the queue written by :class:`chweb.collector.Collector`
|
||||
and sends all messages in a kafka topic.
|
||||
"""
|
||||
async def produce(self):
|
||||
"""
|
||||
Creates and starts an ``aiokafka.AIOKafkaProducer`` and runs a loop
|
||||
that reads from the queue and sends the messages to the topic from the
|
||||
config.
|
||||
"""
|
||||
producer = aiokafka.AIOKafkaProducer(
|
||||
|
||||
def __init__(self, config: Config,
|
||||
logger: logging.Logger,
|
||||
event_loop: asyncio.AbstractEventLoop,
|
||||
queue: asyncio.Queue):
|
||||
super().__init__(config, logger, event_loop, queue)
|
||||
self.producer = aiokafka.AIOKafkaProducer(
|
||||
loop=self.loop,
|
||||
bootstrap_servers=self.config.kafka.servers)
|
||||
|
||||
await producer.start()
|
||||
async def produce(self):
|
||||
"""
|
||||
Creates and starts an ``aiokafka.AIOKafkaProducer`` and runs a loop
|
||||
that reads from the queue and sends the messages to the topic defined
|
||||
in the config.
|
||||
"""
|
||||
await self.producer.start()
|
||||
try:
|
||||
while True:
|
||||
check = await self.queue.get()
|
||||
msg = bytes(check.json().encode("utf-8"))
|
||||
await producer.send_and_wait(self.config.kafka.topic, msg)
|
||||
await self.producer.send_and_wait(self.config.kafka.topic, msg)
|
||||
except Exception as exc:
|
||||
self.logger.error(exc)
|
||||
finally:
|
||||
self.logger.warning("Kafka producer destroyed!")
|
||||
await producer.stop()
|
||||
await self.producer.stop()
|
||||
|
||||
def __call__(self) -> asyncio.Future:
|
||||
return self.produce()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue