Fix formatting, log debug kafka calls
This commit is contained in:
parent
2b9435dddd
commit
3ec541c3f5
3 changed files with 24 additions and 22 deletions
|
@ -28,9 +28,10 @@ def configure(name) -> Tuple[Config, Logger]:
|
||||||
args = parser.parse_args()
|
args = parser.parse_args()
|
||||||
|
|
||||||
with open(args.config, 'r') as conf_file:
|
with open(args.config, 'r') as conf_file:
|
||||||
config = yaml.load(conf_file, Loader=yaml.FullLoader)
|
conf = yaml.load(conf_file, Loader=yaml.FullLoader)
|
||||||
logging.config.dictConfig(config['logging'])
|
logging.config.dictConfig(conf['logging'])
|
||||||
|
|
||||||
|
config = create_config(conf)
|
||||||
logger = logging.getLogger("chweb.{}".format(name))
|
logger = logging.getLogger("chweb.{}".format(name))
|
||||||
return (config, logger)
|
return (config, logger)
|
||||||
|
|
||||||
|
|
|
@ -31,8 +31,9 @@ class Collector(Service):
|
||||||
``chweb.collector.Collector.check_forever``.
|
``chweb.collector.Collector.check_forever``.
|
||||||
"""
|
"""
|
||||||
res = await self.loop.run_in_executor(None, requests.get, url)
|
res = await self.loop.run_in_executor(None, requests.get, url)
|
||||||
matches = None # The matches value should be None since the regex can
|
# Should be treated as nullable since the regex can be ommited from the
|
||||||
# be ommited from the config.
|
# config.
|
||||||
|
matches = None
|
||||||
if regex is not None:
|
if regex is not None:
|
||||||
matches = re.search(regex, res.text) is not None
|
matches = re.search(regex, res.text) is not None
|
||||||
return Check(
|
return Check(
|
||||||
|
@ -100,6 +101,7 @@ class Producer(Service):
|
||||||
while True:
|
while True:
|
||||||
check = await self.queue.get()
|
check = await self.queue.get()
|
||||||
msg = bytes(check.json().encode("utf-8"))
|
msg = bytes(check.json().encode("utf-8"))
|
||||||
|
self.logger.debug(check)
|
||||||
await self.producer.send_and_wait(self.config.kafka.topic, msg)
|
await self.producer.send_and_wait(self.config.kafka.topic, msg)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
self.logger.error(exc)
|
self.logger.error(exc)
|
||||||
|
|
|
@ -11,15 +11,13 @@ import aiokafka # type: ignore
|
||||||
import asyncpg # type: ignore
|
import asyncpg # type: ignore
|
||||||
|
|
||||||
from chweb.base import Service
|
from chweb.base import Service
|
||||||
from chweb.models import Check
|
from chweb.models import Check, PostgresConfig
|
||||||
|
|
||||||
|
|
||||||
class Consumer(Service):
|
class Consumer(Service):
|
||||||
@property
|
@property
|
||||||
def db(self):
|
def db(self):
|
||||||
return Db(self.loop, self.logger, self.config.postgres.dbuser,
|
return Db(self.loop, self.logger, self.config.postgres)
|
||||||
self.config.postgres.dbpass, self.config.postgres.dbhost,
|
|
||||||
self.config.postgres.dbport, self.config.postgres.dbname)
|
|
||||||
|
|
||||||
async def consume(self):
|
async def consume(self):
|
||||||
"""
|
"""
|
||||||
|
@ -38,6 +36,7 @@ class Consumer(Service):
|
||||||
try:
|
try:
|
||||||
check = Check(**json.loads(msg.value))
|
check = Check(**json.loads(msg.value))
|
||||||
self.queue.put_nowait(check)
|
self.queue.put_nowait(check)
|
||||||
|
self.logger.debug(check)
|
||||||
await db.save(check)
|
await db.save(check)
|
||||||
except Exception as exc:
|
except Exception as exc:
|
||||||
err = "error processing message %s; failed with %s"
|
err = "error processing message %s; failed with %s"
|
||||||
|
@ -53,24 +52,25 @@ class Db:
|
||||||
"""
|
"""
|
||||||
Database operations and handy helpers.
|
Database operations and handy helpers.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, loop: asyncio.AbstractEventLoop, logger: logging.Logger,
|
def __init__(self, loop: asyncio.AbstractEventLoop, logger: logging.Logger,
|
||||||
user: str, passwd: str, host: str, port: int, dbname: str):
|
pgconf: PostgresConfig):
|
||||||
self.loop = loop
|
self.loop = loop
|
||||||
self.logger = logger
|
self.logger = logger
|
||||||
# Do a side effect here since without this there's not any point for
|
# Do a side effect here since without this there's not any point for
|
||||||
# the application to start. Applies for tests as well.
|
# the application to start. Applies for tests as well.
|
||||||
self.conn: Optional[asyncpg.Connection] = None
|
self.conn: Optional[asyncpg.Connection] = None
|
||||||
self.user = user
|
self.conf = pgconf
|
||||||
self.passwd = passwd
|
|
||||||
self.host = host
|
|
||||||
self.port = port
|
|
||||||
self.dbname = dbname
|
|
||||||
|
|
||||||
async def __aenter__(self):
|
async def __aenter__(self):
|
||||||
self.conn = await asyncpg.connect(host=self.host, port=self.port,
|
self.conn = await asyncpg.connect(
|
||||||
user=self.user, password=self.passwd,
|
host=self.conf.dbhost,
|
||||||
database=self.dbname, loop=self.loop,
|
port=self.conf.dbport,
|
||||||
timeout=60)
|
user=self.conf.dbuser,
|
||||||
|
password=self.conf.dbpass,
|
||||||
|
database=self.conf.dbname,
|
||||||
|
loop=self.loop, timeout=60,
|
||||||
|
)
|
||||||
return self
|
return self
|
||||||
|
|
||||||
async def __aexit__(self, type_, value, traceback):
|
async def __aexit__(self, type_, value, traceback):
|
||||||
|
@ -108,7 +108,6 @@ class Db:
|
||||||
Writes a single record in the database. This is not very optimal, a
|
Writes a single record in the database. This is not very optimal, a
|
||||||
better way would be to write a batch of status checks at once.
|
better way would be to write a batch of status checks at once.
|
||||||
"""
|
"""
|
||||||
tstamp = time.mktime(data.request_time.timetuple())
|
|
||||||
if self.conn is not None:
|
if self.conn is not None:
|
||||||
try:
|
try:
|
||||||
await self.conn.execute('''
|
await self.conn.execute('''
|
||||||
|
@ -120,5 +119,5 @@ class Db:
|
||||||
data.request_time, data.response_time, data.status,
|
data.request_time, data.response_time, data.status,
|
||||||
data.url)
|
data.url)
|
||||||
except asyncpg.PostgresError as exc:
|
except asyncpg.PostgresError as exc:
|
||||||
self.logger.error("error in query %s", exc.query)
|
self.logger.error("error in query %s", exc)
|
||||||
raise
|
raise
|
||||||
|
|
Loading…
Reference in a new issue