Add logging
This commit is contained in:
parent
60a944cf5e
commit
916fcf1bab
3 changed files with 39 additions and 6 deletions
19
config.yaml
19
config.yaml
|
@ -9,7 +9,7 @@ postgres:
|
||||||
dbuser: "vladan"
|
dbuser: "vladan"
|
||||||
dbpass: ""
|
dbpass: ""
|
||||||
sites:
|
sites:
|
||||||
- url: "https://example.com"
|
- url: "https://dsadakjhkjsahkjh.com"
|
||||||
regex: "domain"
|
regex: "domain"
|
||||||
check_interval: 5
|
check_interval: 5
|
||||||
- url: "https://example.com"
|
- url: "https://example.com"
|
||||||
|
@ -17,3 +17,20 @@ sites:
|
||||||
check_interval: 8
|
check_interval: 8
|
||||||
- url: "https://example.com/404"
|
- url: "https://example.com/404"
|
||||||
check_interval: 13
|
check_interval: 13
|
||||||
|
logging:
|
||||||
|
version: 1
|
||||||
|
formatters:
|
||||||
|
standard:
|
||||||
|
format: "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||||
|
error:
|
||||||
|
format: "%(levelname)s <PID %(process)d:%(processName)s> %(name)s.%(funcName)s(): %(message)s"
|
||||||
|
handlers:
|
||||||
|
console:
|
||||||
|
class: logging.StreamHandler
|
||||||
|
level: DEBUG
|
||||||
|
formatter: standard
|
||||||
|
stream: ext://sys.stdout
|
||||||
|
root:
|
||||||
|
level: DEBUG
|
||||||
|
handlers: [console]
|
||||||
|
propogate: yes
|
||||||
|
|
|
@ -3,6 +3,10 @@ A module containing all console script functions.
|
||||||
"""
|
"""
|
||||||
import argparse
|
import argparse
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import logging
|
||||||
|
import logging.config
|
||||||
|
from logging import Logger
|
||||||
|
from typing import Tuple
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
from chweb.collector import Collector
|
from chweb.collector import Collector
|
||||||
|
@ -10,7 +14,7 @@ from chweb.consumer import Consumer
|
||||||
from chweb.models import Config
|
from chweb.models import Config
|
||||||
|
|
||||||
|
|
||||||
def configure() -> Config:
|
def configure(name) -> Tuple[Config, Logger]:
|
||||||
"""
|
"""
|
||||||
Gets the configuration and creates a Pydantic model from the parsed YAML.
|
Gets the configuration and creates a Pydantic model from the parsed YAML.
|
||||||
"""
|
"""
|
||||||
|
@ -23,7 +27,9 @@ def configure() -> Config:
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
with open(args.config, 'r') as conf_file:
|
with open(args.config, 'r') as conf_file:
|
||||||
config = yaml.load(conf_file, Loader=yaml.FullLoader)
|
config = yaml.load(conf_file, Loader=yaml.FullLoader)
|
||||||
return Config(**config)
|
logging.config.dictConfig(config['logging'])
|
||||||
|
logger = logging.getLogger("chweb.{}".format(name))
|
||||||
|
return (Config(**config), logger)
|
||||||
|
|
||||||
|
|
||||||
def run(Service):
|
def run(Service):
|
||||||
|
@ -32,8 +38,11 @@ def run(Service):
|
||||||
"""
|
"""
|
||||||
loop = asyncio.get_event_loop()
|
loop = asyncio.get_event_loop()
|
||||||
queue = asyncio.Queue()
|
queue = asyncio.Queue()
|
||||||
config = configure()
|
config, logger = configure(Service.__name__)
|
||||||
service = Service(config, loop, queue)
|
logger.info(("Starting service on kafka [cluster]/topic: "
|
||||||
|
"{}/{}").format(config.kafka.servers,
|
||||||
|
config.kafka.topic))
|
||||||
|
service = Service(config, logger, loop, queue)
|
||||||
service.run()
|
service.run()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -56,7 +56,12 @@ class Collector:
|
||||||
:param site: A site object from the config.
|
:param site: A site object from the config.
|
||||||
"""
|
"""
|
||||||
while True:
|
while True:
|
||||||
|
try:
|
||||||
data = await self.check(site.url, site.regex)
|
data = await self.check(site.url, site.regex)
|
||||||
|
except Exception as exc:
|
||||||
|
errmsg = "{}; {}".format(site.url, exc)
|
||||||
|
self.logger.error(errmsg)
|
||||||
|
break # Break the loop and destroy the Task.
|
||||||
self.queue.put_nowait(data)
|
self.queue.put_nowait(data)
|
||||||
await asyncio.sleep(site.check_interval)
|
await asyncio.sleep(site.check_interval)
|
||||||
|
|
||||||
|
@ -77,6 +82,7 @@ class Collector:
|
||||||
msg = bytes(check.json().encode("utf-8"))
|
msg = bytes(check.json().encode("utf-8"))
|
||||||
await producer.send_and_wait(self.config.kafka.topic, msg)
|
await producer.send_and_wait(self.config.kafka.topic, msg)
|
||||||
finally:
|
finally:
|
||||||
|
self.logger.warning("Kafka producer destroyed!")
|
||||||
await producer.stop()
|
await producer.stop()
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
|
@ -88,3 +94,4 @@ class Collector:
|
||||||
tasks = list(map(create_task, self.config.sites))
|
tasks = list(map(create_task, self.config.sites))
|
||||||
tasks.append(self.loop.create_task(self.produce()))
|
tasks.append(self.loop.create_task(self.produce()))
|
||||||
self.loop.run_until_complete(asyncio.gather(*tasks))
|
self.loop.run_until_complete(asyncio.gather(*tasks))
|
||||||
|
self.logger.info("Checker stopped ...")
|
||||||
|
|
Loading…
Reference in a new issue