diff --git a/config.yaml b/config.yaml index df29755..1152d81 100644 --- a/config.yaml +++ b/config.yaml @@ -13,15 +13,22 @@ postgres: dbuser: "avnadmin" dbpass: "" dbcert: "./certs/pg.pem" + batch_size: 10 sites: -- url: "https://dsadakjhkjsahkjh.com" - regex: "domain" - check_interval: 5 - url: "https://example.com" - regex: "aaaaaaaaaaaaa" - check_interval: 8 -- url: "https://example.com/404" - check_interval: 13 + regex: "domain" + check_interval: 17 +- url: "https://github.com/status" + regex: "^Github lives!.*" + check_interval: 31 +- url: "https://medium.com/status" + check_interval: 11 +- url: "https://help.netflix.com/en/is-netflix-down" + regex: "Netflix is up" + check_interval: 37 +- url: "https://status.aiven.io/" + regex: "All Systems Operational" + check_interval: 43 logging: version: 1 formatters: diff --git a/src/chweb/cmd.py b/src/chweb/cmd.py index 3f8bb39..0f627bd 100644 --- a/src/chweb/cmd.py +++ b/src/chweb/cmd.py @@ -4,7 +4,7 @@ A module containing all console script functions. import asyncio from chweb.collector import Collector, Producer -from chweb.consumer import Consumer +from chweb.consumer import Consumer, DbWriter from chweb.config import configure @@ -18,9 +18,11 @@ def collect(): config, logger = configure("collector") logger.info("Starting collector for %d sites.", len(config.sites)) collector = Collector(config, logger, loop, queue) + logger.info(("Starting kafka producer on kafka [cluster]/topic: " "%s/%s"), config.kafka.servers, config.kafka.topic) producer = Producer(config, logger, loop, queue) + loop.run_until_complete(asyncio.gather(collector(), producer())) @@ -35,4 +37,10 @@ def consume(): logger.info(("Starting kafka consumer on kafka [cluster]/topic: " "%s/%s"), config.kafka.servers, config.kafka.topic) consumer = Consumer(config, logger, loop, queue) - loop.run_until_complete(consumer()) + + logger.info(("Starting database writer to %s@%s:%d/%s"), + config.postgres.dbuser, config.postgres.dbhost, + config.postgres.dbport, config.postgres.dbname) + db_writer = DbWriter(config, logger, loop, queue) + + loop.run_until_complete(asyncio.gather(consumer(), db_writer())) diff --git a/src/chweb/config.py b/src/chweb/config.py index f4299dc..7d2e779 100644 --- a/src/chweb/config.py +++ b/src/chweb/config.py @@ -12,9 +12,74 @@ import yaml from chweb.models import Config +def set_kafka_config_from_env(config: Config): + """ + This function reads all Kafka environment variables and stores them in the + config if they are set. + """ + kafka_servers_env = os.getenv('KAFKA_SERVERS') + if kafka_servers_env is not None: + kafka_servers = kafka_servers_env.split(',') + + kafka_topic = os.getenv('KAFKA_TOPIC') + kafka_cafile = os.getenv('KAFKA_CA_FILE') + kafka_cert = os.getenv('KAFKA_CERT') + kafka_key = os.getenv('KAFKA_KEY') + kafka_pass = os.getenv('KAFKA_PASS') + + config.kafka.servers = (kafka_servers if kafka_servers_env + else config.kafka.servers) + config.kafka.topic = kafka_topic or config.kafka.topic + config.kafka.cafile = kafka_cafile or config.kafka.cafile + config.kafka.cert = kafka_cert or config.kafka.cert + config.kafka.key = kafka_key or config.kafka.key + config.kafka.passwd = kafka_pass or config.kafka.passwd + + +def set_postgres_config_from_env(config: Config): + """ + This function reads all Postgres environment variables and store them in + the config if they are set. + + It also tries to convert to ``int`` the variables that are required as + ints. If the conversion fails, it falls back to the default values. + """ + pg_db = os.getenv('POSTGRES_DB') + pg_host = os.getenv('POSTGRES_HOST') + pg_port = os.getenv('POSTGRES_PORT') + pg_user = os.getenv('POSTGRES_USER') + pg_pass = os.getenv('POSTGRES_PASS') + pg_cert = os.getenv('POSTGRES_CERT') + batch_size = os.getenv('BATCH_SIZE') + + config.postgres.dbhost = pg_host or config.postgres.dbhost + config.postgres.dbname = pg_db or config.postgres.dbname + config.postgres.dbport = (int(pg_port) if pg_port is not None + else config.postgres.dbport) + config.postgres.dbuser = pg_user or config.postgres.dbuser + config.postgres.dbpass = pg_pass or config.postgres.dbpass + config.postgres.dbcert = pg_cert or config.postgres.dbcert + config.postgres.batch_size = (int(batch_size) if batch_size is not None + else config.postgres.batch_size) + + +def create_config(conf: Dict[str, Any]) -> Config: + """ + Creates a :class:``chweb.models.Config`` objects, updates it with the + environment variables' values (if they are set) for both Kafka and Postgres + and returns the created :class:``chweb.models.Config`` object. + """ + config = Config(**conf) + set_kafka_config_from_env(config) + set_postgres_config_from_env(config) + return config + + def configure(name) -> Tuple[Config, Logger]: """ Gets the configuration and creates a Pydantic model from the parsed YAML. + This function also sets up the logger and returns it togeather with the + config. """ parser = argparse.ArgumentParser( description='Website availibility checker.') @@ -31,41 +96,3 @@ def configure(name) -> Tuple[Config, Logger]: config = create_config(conf) logger = logging.getLogger("chweb.{}".format(name)) return (config, logger) - - -def create_config(conf: Dict[str, Any]): - kafka_servers_env = os.getenv('KAFKA_SERVERS') - if kafka_servers_env is not None: - kafka_servers = kafka_servers_env.split(',') - - kafka_topic = os.getenv('KAFKA_TOPIC') - kafka_cafile = os.getenv('KAFKA_CA_FILE') - kafka_cert = os.getenv('KAFKA_CERT') - kafka_key = os.getenv('KAFKA_KEY') - kafka_pass = os.getenv('KAFKA_PASS') - - pg_db = os.getenv('POSTGRES_DB') - pg_host = os.getenv('POSTGRES_HOST') - pg_port = os.getenv('POSTGRES_PORT') - pg_user = os.getenv('POSTGRES_USER') - pg_pass = os.getenv('POSTGRES_PASS') - pg_cert = os.getenv('POSTGRES_CERT') - - config = Config(**conf) - config.kafka.servers = (kafka_servers if kafka_servers_env - else config.kafka.servers) - config.kafka.topic = kafka_topic or config.kafka.topic - config.kafka.cafile = kafka_cafile or config.kafka.cafile - config.kafka.cert = kafka_cert or config.kafka.cert - config.kafka.key = kafka_key or config.kafka.key - config.kafka.passwd = kafka_pass or config.kafka.passwd - - config.postgres.dbhost = pg_host or config.postgres.dbhost - config.postgres.dbname = pg_db or config.postgres.dbname - config.postgres.dbport = (int(pg_port) if pg_port is not None - else config.postgres.dbport) - config.postgres.dbuser = pg_user or config.postgres.dbuser - config.postgres.dbpass = pg_pass or config.postgres.dbpass - config.postgres.dbcert = pg_cert or config.postgres.dbcert - - return config diff --git a/src/chweb/consumer.py b/src/chweb/consumer.py index 2009634..75d4e7a 100644 --- a/src/chweb/consumer.py +++ b/src/chweb/consumer.py @@ -12,7 +12,7 @@ from aiokafka.helpers import create_ssl_context # type: ignore import asyncpg # type: ignore from chweb.base import Service -from chweb.models import Check, Config, PostgresConfig +from chweb.models import Check, Config class Consumer(Service): @@ -27,7 +27,6 @@ class Consumer(Service): event_loop: asyncio.AbstractEventLoop, queue: asyncio.Queue): super().__init__(config, logger, event_loop, queue) - self.db = Db(self.loop, self.logger, self.config.postgres) context = create_ssl_context( cafile=self.config.kafka.cafile, certfile=self.config.kafka.cert, @@ -48,14 +47,13 @@ class Consumer(Service): """ await self.consumer.start() try: - await self.db.setup() async for msg in self.consumer: # if anything here fails break the loop and exit since it's # something out of our control and we don't want to work # with broken data check = Check(**json.loads(msg.value)) self.logger.debug(check) - await self.db.save(check) + self.queue.put_nowait(check) except Exception as exc: self.logger.exception(exc) self.logger.info("Exiting due to previous errors!") @@ -66,33 +64,42 @@ class Consumer(Service): return self.consume() -class Db: +class DbWriter(Service): """ Database operations and handy helpers. """ - def __init__(self, loop: asyncio.AbstractEventLoop, logger: logging.Logger, - pgconf: PostgresConfig): - self.loop = loop - self.logger = logger - # Do a side effect here since without this there's not any point for - # the application to start. Applies for tests as well. + def __init__(self, config: Config, + logger: logging.Logger, + event_loop: asyncio.AbstractEventLoop, + queue: asyncio.Queue): + super().__init__(config, logger, event_loop, queue) self.conn: Optional[asyncpg.Connection] = None - self.conf = pgconf + + async def connect(self): + """ + Connects to the database and stores the connection in ``self.conn``. + """ + try: + self.conn = await asyncpg.connect( + host=self.config.postgres.dbhost, + port=self.config.postgres.dbport, + user=self.config.postgres.dbuser, + password=self.config.postgres.dbpass, + database=self.config.postgres.dbname, + loop=self.loop, timeout=60, + ssl=ssl.create_default_context( + cafile=self.config.postgres.dbcert), + ) + except (OSError, asyncio.TimeoutError, ConnectionError) as exc: + self.logger.error(exc) + raise async def setup(self): """ Setup the database, i.e. create the table and set up the indexes. """ - self.conn = await asyncpg.connect( - host=self.conf.dbhost, - port=self.conf.dbport, - user=self.conf.dbuser, - password=self.conf.dbpass, - database=self.conf.dbname, - loop=self.loop, timeout=60, - ssl=ssl.create_default_context(cafile=self.conf.dbcert), - ) + await self.connect() await self.conn.execute(''' CREATE TABLE IF NOT EXISTS statuses( id SERIAL PRIMARY KEY, @@ -116,21 +123,32 @@ class Db: statuses_regex_matches ON statuses(regex_matches); ''') - async def save(self, data: Check): + async def write(self): """ - Writes a single record in the database. This is not very optimal, a - better way would be to write a batch of status checks at once. + Writes a batch of records to the databse. The size in the batch is + defined in ``chweb.models.PostgresConfig.batch_size``. """ - if self.conn is not None: - try: - await self.conn.execute( - '''INSERT INTO statuses (domain, regex, regex_matches, - request_time, response_time, - status, url) - VALUES($1, $2, $3, $4, $5, $6, $7)''', - data.domain, data.regex, data.regex_matches, - data.request_time, data.response_time, data.status, - data.url) - except asyncpg.PostgresError as exc: - self.logger.error("error in query %s", exc) - raise + await self.setup() + + batch = [] + while True: + if len(batch) < self.config.postgres.batch_size: + check = await self.queue.get() + batch.append(check) + else: + records = [(c.domain, c.regex, c.regex_matches, c.request_time, + c.response_time, c.status, c.url) for c in batch] + columns = ["domain", "regex", "regex_matches", "request_time", + "response_time", "status", "url"] + try: + result = await self.conn.copy_records_to_table( + "statuses", records=records, columns=columns) + self.logger.info(("Inserted %d records in the database " + "with result %s"), len(records), result) + except Exception as exc: + self.logger.error("error in query %s", exc) + raise + batch = [] + + def __call__(self) -> asyncio.Future: + return self.write() diff --git a/src/chweb/models.py b/src/chweb/models.py index 3a18018..1f83b56 100644 --- a/src/chweb/models.py +++ b/src/chweb/models.py @@ -49,6 +49,7 @@ class PostgresConfig(BaseModel): dbuser: str = "vladan" dbpass: str = "" dbcert: str = "" + batch_size: int = 5 class SiteConfig(BaseModel): diff --git a/tests/conftest.py b/tests/conftest.py index ec042a1..151cf13 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,6 +21,7 @@ def config(): 'dbname': "chweb", 'dbuser': "vladan", 'dbpass': "", + 'batch_size': 3, }, 'sites': [{ 'url': "https://example.com", @@ -69,3 +70,27 @@ def check(): status=200, url="https://example.com", ) + + +@pytest.fixture +def three_checks(): + return [ + Check( + domain="example.com", + response_time=51, + status=200, + url="https://example.com", + ), + Check( + domain="example.com/200", + response_time=65, + status=200, + url="https://example.com", + ), + Check( + domain="example.com/404", + response_time=35, + status=404, + url="https://example.com", + ), + ] diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 89712c8..234d623 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -1,9 +1,7 @@ import asyncio - -import aiokafka -from mock import AsyncMock, Mock, patch import pytest +from mock import AsyncMock, Mock, patch from chweb.consumer import Consumer @@ -13,10 +11,9 @@ async def test_consumer_called(check, config, event_loop): consumer = Consumer(config, Mock(), event_loop, Mock()) consumer.consumer = AsyncMock() - consumer.db = AsyncMock() task = event_loop.create_task(consumer()) await asyncio.sleep(0) - consumer.db.setup.assert_called() consumer.consumer.start.assert_called() + consumer.consumer.__aiter__.assert_called() task.cancel() diff --git a/tests/test_db.py b/tests/test_db.py new file mode 100644 index 0000000..4828c41 --- /dev/null +++ b/tests/test_db.py @@ -0,0 +1,37 @@ +import asyncio + +import pytest +from mock import AsyncMock, Mock + +import chweb.consumer +from chweb.consumer import DbWriter + + +@pytest.mark.asyncio +async def test_db_setup(three_checks, config, event_loop): + queue = asyncio.Queue() + db_writer = DbWriter(config, Mock(), event_loop, queue) + assert db_writer.conn is None + + chweb.consumer.asyncpg = AsyncMock() + await db_writer.connect() + chweb.consumer.asyncpg.connect.assert_called() + + db_writer.conn = AsyncMock() + await db_writer.setup() + db_writer.conn.execute.assert_called() + + for check in three_checks: + await queue.put(check) + print("&&&&&&&&&&") + print("&&&&&&&&&&") + print("&&&&&&&&&&") + print(queue.qsize()) + print(config.postgres.batch_size) + print("&&&&&&&&&&") + print("&&&&&&&&&&") + print("&&&&&&&&&&") + task = event_loop.create_task(db_writer()) + await asyncio.sleep(0.5) + db_writer.conn.copy_records_to_table.assert_called() + task.cancel()