Increase caution in the consumer
- Fail on invalid data received from the broker. - Remove the context manager since it's an unnecessary complication. - Make most of the fields in the Check model mandatory. - introduce more mandatory fields in the config models.
This commit is contained in:
parent
c6b4296c61
commit
99a99a6122
2 changed files with 22 additions and 26 deletions
|
@ -36,18 +36,17 @@ class Consumer(Service):
|
|||
"""
|
||||
await self.consumer.start()
|
||||
try:
|
||||
async with self.db as db:
|
||||
await db.setup()
|
||||
async for msg in self.consumer:
|
||||
try:
|
||||
check = Check(**json.loads(msg.value))
|
||||
self.logger.debug(check)
|
||||
await db.save(check)
|
||||
except Exception as exc:
|
||||
err = "error processing message %s; failed with %s"
|
||||
self.logger.error(err, msg, exc)
|
||||
await self.db.setup()
|
||||
async for msg in self.consumer:
|
||||
# if anything here fails break the loop and exit since it's
|
||||
# something out of our control and we don't want to work
|
||||
# with broken data
|
||||
check = Check(**json.loads(msg.value))
|
||||
self.logger.debug(check)
|
||||
await self.db.save(check)
|
||||
except Exception as exc:
|
||||
self.logger.error(exc)
|
||||
self.logger.info("Exiting due to previous errors!")
|
||||
finally:
|
||||
await self.consumer.stop()
|
||||
|
||||
|
@ -69,7 +68,10 @@ class Db:
|
|||
self.conn: Optional[asyncpg.Connection] = None
|
||||
self.conf = pgconf
|
||||
|
||||
async def __aenter__(self):
|
||||
async def setup(self):
|
||||
"""
|
||||
Setup the database, i.e. create the table and set up the indexes.
|
||||
"""
|
||||
self.conn = await asyncpg.connect(
|
||||
host=self.conf.dbhost,
|
||||
port=self.conf.dbport,
|
||||
|
@ -78,15 +80,6 @@ class Db:
|
|||
database=self.conf.dbname,
|
||||
loop=self.loop, timeout=60,
|
||||
)
|
||||
return self
|
||||
|
||||
async def __aexit__(self, type_, value, traceback):
|
||||
await self.conn.close()
|
||||
|
||||
async def setup(self):
|
||||
"""
|
||||
Setup the database, i.e. create the table and set up the indexes.
|
||||
"""
|
||||
await self.conn.execute('''
|
||||
CREATE TABLE IF NOT EXISTS statuses(
|
||||
id SERIAL PRIMARY KEY,
|
||||
|
|
|
@ -15,13 +15,16 @@ class Check(BaseModel):
|
|||
"""
|
||||
Information for a website check request.
|
||||
"""
|
||||
domain: str = ""
|
||||
domain: str
|
||||
regex: Optional[str] = None
|
||||
regex_matches: Optional[bool] = None
|
||||
request_time: datetime = datetime.now()
|
||||
response_time: int = 0
|
||||
status: int = 0
|
||||
url: str = ""
|
||||
response_time: int
|
||||
status: int
|
||||
url: str
|
||||
|
||||
class Config:
|
||||
extra = "forbid"
|
||||
|
||||
|
||||
class KafkaConfig(BaseModel):
|
||||
|
@ -48,8 +51,8 @@ class SiteConfig(BaseModel):
|
|||
Single website configuration.
|
||||
"""
|
||||
url: str = "https://example.com"
|
||||
regex: str = "domain"
|
||||
check_interval: int = 5
|
||||
regex: Optional[str] = None
|
||||
check_interval: int
|
||||
|
||||
|
||||
class Config(BaseModel):
|
||||
|
|
Loading…
Reference in a new issue