Batch insert checks in the db (#1)

This commit is contained in:
Vladan Popovic 2020-09-09 00:27:40 +02:00 committed by GitHub
parent 4c3fd293db
commit 0cf6670618
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 209 additions and 89 deletions

View File

@ -13,15 +13,22 @@ postgres:
dbuser: "avnadmin"
dbpass: ""
dbcert: "./certs/pg.pem"
batch_size: 10
sites:
- url: "https://dsadakjhkjsahkjh.com"
regex: "domain"
check_interval: 5
- url: "https://example.com"
regex: "aaaaaaaaaaaaa"
check_interval: 8
- url: "https://example.com/404"
check_interval: 13
regex: "domain"
check_interval: 17
- url: "https://github.com/status"
regex: "^Github lives!.*"
check_interval: 31
- url: "https://medium.com/status"
check_interval: 11
- url: "https://help.netflix.com/en/is-netflix-down"
regex: "Netflix is up"
check_interval: 37
- url: "https://status.aiven.io/"
regex: "All Systems Operational"
check_interval: 43
logging:
version: 1
formatters:

View File

@ -4,7 +4,7 @@ A module containing all console script functions.
import asyncio
from chweb.collector import Collector, Producer
from chweb.consumer import Consumer
from chweb.consumer import Consumer, DbWriter
from chweb.config import configure
@ -18,9 +18,11 @@ def collect():
config, logger = configure("collector")
logger.info("Starting collector for %d sites.", len(config.sites))
collector = Collector(config, logger, loop, queue)
logger.info(("Starting kafka producer on kafka [cluster]/topic: "
"%s/%s"), config.kafka.servers, config.kafka.topic)
producer = Producer(config, logger, loop, queue)
loop.run_until_complete(asyncio.gather(collector(), producer()))
@ -35,4 +37,10 @@ def consume():
logger.info(("Starting kafka consumer on kafka [cluster]/topic: "
"%s/%s"), config.kafka.servers, config.kafka.topic)
consumer = Consumer(config, logger, loop, queue)
loop.run_until_complete(consumer())
logger.info(("Starting database writer to %s@%s:%d/%s"),
config.postgres.dbuser, config.postgres.dbhost,
config.postgres.dbport, config.postgres.dbname)
db_writer = DbWriter(config, logger, loop, queue)
loop.run_until_complete(asyncio.gather(consumer(), db_writer()))

View File

@ -12,9 +12,74 @@ import yaml
from chweb.models import Config
def set_kafka_config_from_env(config: Config):
"""
This function reads all Kafka environment variables and stores them in the
config if they are set.
"""
kafka_servers_env = os.getenv('KAFKA_SERVERS')
if kafka_servers_env is not None:
kafka_servers = kafka_servers_env.split(',')
kafka_topic = os.getenv('KAFKA_TOPIC')
kafka_cafile = os.getenv('KAFKA_CA_FILE')
kafka_cert = os.getenv('KAFKA_CERT')
kafka_key = os.getenv('KAFKA_KEY')
kafka_pass = os.getenv('KAFKA_PASS')
config.kafka.servers = (kafka_servers if kafka_servers_env
else config.kafka.servers)
config.kafka.topic = kafka_topic or config.kafka.topic
config.kafka.cafile = kafka_cafile or config.kafka.cafile
config.kafka.cert = kafka_cert or config.kafka.cert
config.kafka.key = kafka_key or config.kafka.key
config.kafka.passwd = kafka_pass or config.kafka.passwd
def set_postgres_config_from_env(config: Config):
"""
This function reads all Postgres environment variables and store them in
the config if they are set.
It also tries to convert to ``int`` the variables that are required as
ints. If the conversion fails, it falls back to the default values.
"""
pg_db = os.getenv('POSTGRES_DB')
pg_host = os.getenv('POSTGRES_HOST')
pg_port = os.getenv('POSTGRES_PORT')
pg_user = os.getenv('POSTGRES_USER')
pg_pass = os.getenv('POSTGRES_PASS')
pg_cert = os.getenv('POSTGRES_CERT')
batch_size = os.getenv('BATCH_SIZE')
config.postgres.dbhost = pg_host or config.postgres.dbhost
config.postgres.dbname = pg_db or config.postgres.dbname
config.postgres.dbport = (int(pg_port) if pg_port is not None
else config.postgres.dbport)
config.postgres.dbuser = pg_user or config.postgres.dbuser
config.postgres.dbpass = pg_pass or config.postgres.dbpass
config.postgres.dbcert = pg_cert or config.postgres.dbcert
config.postgres.batch_size = (int(batch_size) if batch_size is not None
else config.postgres.batch_size)
def create_config(conf: Dict[str, Any]) -> Config:
"""
Creates a :class:``chweb.models.Config`` objects, updates it with the
environment variables' values (if they are set) for both Kafka and Postgres
and returns the created :class:``chweb.models.Config`` object.
"""
config = Config(**conf)
set_kafka_config_from_env(config)
set_postgres_config_from_env(config)
return config
def configure(name) -> Tuple[Config, Logger]:
"""
Gets the configuration and creates a Pydantic model from the parsed YAML.
This function also sets up the logger and returns it togeather with the
config.
"""
parser = argparse.ArgumentParser(
description='Website availibility checker.')
@ -31,41 +96,3 @@ def configure(name) -> Tuple[Config, Logger]:
config = create_config(conf)
logger = logging.getLogger("chweb.{}".format(name))
return (config, logger)
def create_config(conf: Dict[str, Any]):
kafka_servers_env = os.getenv('KAFKA_SERVERS')
if kafka_servers_env is not None:
kafka_servers = kafka_servers_env.split(',')
kafka_topic = os.getenv('KAFKA_TOPIC')
kafka_cafile = os.getenv('KAFKA_CA_FILE')
kafka_cert = os.getenv('KAFKA_CERT')
kafka_key = os.getenv('KAFKA_KEY')
kafka_pass = os.getenv('KAFKA_PASS')
pg_db = os.getenv('POSTGRES_DB')
pg_host = os.getenv('POSTGRES_HOST')
pg_port = os.getenv('POSTGRES_PORT')
pg_user = os.getenv('POSTGRES_USER')
pg_pass = os.getenv('POSTGRES_PASS')
pg_cert = os.getenv('POSTGRES_CERT')
config = Config(**conf)
config.kafka.servers = (kafka_servers if kafka_servers_env
else config.kafka.servers)
config.kafka.topic = kafka_topic or config.kafka.topic
config.kafka.cafile = kafka_cafile or config.kafka.cafile
config.kafka.cert = kafka_cert or config.kafka.cert
config.kafka.key = kafka_key or config.kafka.key
config.kafka.passwd = kafka_pass or config.kafka.passwd
config.postgres.dbhost = pg_host or config.postgres.dbhost
config.postgres.dbname = pg_db or config.postgres.dbname
config.postgres.dbport = (int(pg_port) if pg_port is not None
else config.postgres.dbport)
config.postgres.dbuser = pg_user or config.postgres.dbuser
config.postgres.dbpass = pg_pass or config.postgres.dbpass
config.postgres.dbcert = pg_cert or config.postgres.dbcert
return config

View File

@ -12,7 +12,7 @@ from aiokafka.helpers import create_ssl_context # type: ignore
import asyncpg # type: ignore
from chweb.base import Service
from chweb.models import Check, Config, PostgresConfig
from chweb.models import Check, Config
class Consumer(Service):
@ -27,7 +27,6 @@ class Consumer(Service):
event_loop: asyncio.AbstractEventLoop,
queue: asyncio.Queue):
super().__init__(config, logger, event_loop, queue)
self.db = Db(self.loop, self.logger, self.config.postgres)
context = create_ssl_context(
cafile=self.config.kafka.cafile,
certfile=self.config.kafka.cert,
@ -48,14 +47,13 @@ class Consumer(Service):
"""
await self.consumer.start()
try:
await self.db.setup()
async for msg in self.consumer:
# if anything here fails break the loop and exit since it's
# something out of our control and we don't want to work
# with broken data
check = Check(**json.loads(msg.value))
self.logger.debug(check)
await self.db.save(check)
self.queue.put_nowait(check)
except Exception as exc:
self.logger.exception(exc)
self.logger.info("Exiting due to previous errors!")
@ -66,33 +64,42 @@ class Consumer(Service):
return self.consume()
class Db:
class DbWriter(Service):
"""
Database operations and handy helpers.
"""
def __init__(self, loop: asyncio.AbstractEventLoop, logger: logging.Logger,
pgconf: PostgresConfig):
self.loop = loop
self.logger = logger
# Do a side effect here since without this there's not any point for
# the application to start. Applies for tests as well.
def __init__(self, config: Config,
logger: logging.Logger,
event_loop: asyncio.AbstractEventLoop,
queue: asyncio.Queue):
super().__init__(config, logger, event_loop, queue)
self.conn: Optional[asyncpg.Connection] = None
self.conf = pgconf
async def connect(self):
"""
Connects to the database and stores the connection in ``self.conn``.
"""
try:
self.conn = await asyncpg.connect(
host=self.config.postgres.dbhost,
port=self.config.postgres.dbport,
user=self.config.postgres.dbuser,
password=self.config.postgres.dbpass,
database=self.config.postgres.dbname,
loop=self.loop, timeout=60,
ssl=ssl.create_default_context(
cafile=self.config.postgres.dbcert),
)
except (OSError, asyncio.TimeoutError, ConnectionError) as exc:
self.logger.error(exc)
raise
async def setup(self):
"""
Setup the database, i.e. create the table and set up the indexes.
"""
self.conn = await asyncpg.connect(
host=self.conf.dbhost,
port=self.conf.dbport,
user=self.conf.dbuser,
password=self.conf.dbpass,
database=self.conf.dbname,
loop=self.loop, timeout=60,
ssl=ssl.create_default_context(cafile=self.conf.dbcert),
)
await self.connect()
await self.conn.execute('''
CREATE TABLE IF NOT EXISTS statuses(
id SERIAL PRIMARY KEY,
@ -116,21 +123,32 @@ class Db:
statuses_regex_matches ON statuses(regex_matches);
''')
async def save(self, data: Check):
async def write(self):
"""
Writes a single record in the database. This is not very optimal, a
better way would be to write a batch of status checks at once.
Writes a batch of records to the databse. The size in the batch is
defined in ``chweb.models.PostgresConfig.batch_size``.
"""
if self.conn is not None:
try:
await self.conn.execute(
'''INSERT INTO statuses (domain, regex, regex_matches,
request_time, response_time,
status, url)
VALUES($1, $2, $3, $4, $5, $6, $7)''',
data.domain, data.regex, data.regex_matches,
data.request_time, data.response_time, data.status,
data.url)
except asyncpg.PostgresError as exc:
self.logger.error("error in query %s", exc)
raise
await self.setup()
batch = []
while True:
if len(batch) < self.config.postgres.batch_size:
check = await self.queue.get()
batch.append(check)
else:
records = [(c.domain, c.regex, c.regex_matches, c.request_time,
c.response_time, c.status, c.url) for c in batch]
columns = ["domain", "regex", "regex_matches", "request_time",
"response_time", "status", "url"]
try:
result = await self.conn.copy_records_to_table(
"statuses", records=records, columns=columns)
self.logger.info(("Inserted %d records in the database "
"with result %s"), len(records), result)
except Exception as exc:
self.logger.error("error in query %s", exc)
raise
batch = []
def __call__(self) -> asyncio.Future:
return self.write()

View File

@ -49,6 +49,7 @@ class PostgresConfig(BaseModel):
dbuser: str = "vladan"
dbpass: str = ""
dbcert: str = ""
batch_size: int = 5
class SiteConfig(BaseModel):

View File

@ -21,6 +21,7 @@ def config():
'dbname': "chweb",
'dbuser': "vladan",
'dbpass': "",
'batch_size': 3,
},
'sites': [{
'url': "https://example.com",
@ -69,3 +70,27 @@ def check():
status=200,
url="https://example.com",
)
@pytest.fixture
def three_checks():
return [
Check(
domain="example.com",
response_time=51,
status=200,
url="https://example.com",
),
Check(
domain="example.com/200",
response_time=65,
status=200,
url="https://example.com",
),
Check(
domain="example.com/404",
response_time=35,
status=404,
url="https://example.com",
),
]

View File

@ -1,9 +1,7 @@
import asyncio
import aiokafka
from mock import AsyncMock, Mock, patch
import pytest
from mock import AsyncMock, Mock, patch
from chweb.consumer import Consumer
@ -13,10 +11,9 @@ async def test_consumer_called(check, config, event_loop):
consumer = Consumer(config, Mock(), event_loop, Mock())
consumer.consumer = AsyncMock()
consumer.db = AsyncMock()
task = event_loop.create_task(consumer())
await asyncio.sleep(0)
consumer.db.setup.assert_called()
consumer.consumer.start.assert_called()
consumer.consumer.__aiter__.assert_called()
task.cancel()

37
tests/test_db.py Normal file
View File

@ -0,0 +1,37 @@
import asyncio
import pytest
from mock import AsyncMock, Mock
import chweb.consumer
from chweb.consumer import DbWriter
@pytest.mark.asyncio
async def test_db_setup(three_checks, config, event_loop):
queue = asyncio.Queue()
db_writer = DbWriter(config, Mock(), event_loop, queue)
assert db_writer.conn is None
chweb.consumer.asyncpg = AsyncMock()
await db_writer.connect()
chweb.consumer.asyncpg.connect.assert_called()
db_writer.conn = AsyncMock()
await db_writer.setup()
db_writer.conn.execute.assert_called()
for check in three_checks:
await queue.put(check)
print("&&&&&&&&&&")
print("&&&&&&&&&&")
print("&&&&&&&&&&")
print(queue.qsize())
print(config.postgres.batch_size)
print("&&&&&&&&&&")
print("&&&&&&&&&&")
print("&&&&&&&&&&")
task = event_loop.create_task(db_writer())
await asyncio.sleep(0.5)
db_writer.conn.copy_records_to_table.assert_called()
task.cancel()