diff --git a/config.yaml b/config.yaml index 6ceffae..df29755 100644 --- a/config.yaml +++ b/config.yaml @@ -1,13 +1,18 @@ kafka: servers: - - "localhost:9992" - topic: "sample" + - "kafka-f7ae38e-vladanovic-4654.aivencloud.com:23702" + topic: "sitestats" + cafile: "./certs/ca.pem" + cert: "./certs/service.cert" + key: "./certs/service.key" + passwd: "" postgres: - dbhost: "localhost" - dbport: 5432 - dbname: "chweb" - dbuser: "vladan" + dbhost: "pg-2e0f365c-vladanovic-4654.aivencloud.com" + dbport: 23700 + dbname: "defaultdb" + dbuser: "avnadmin" dbpass: "" + dbcert: "./certs/pg.pem" sites: - url: "https://dsadakjhkjsahkjh.com" regex: "domain" diff --git a/src/chweb/collector.py b/src/chweb/collector.py index c2b2d20..68392bd 100644 --- a/src/chweb/collector.py +++ b/src/chweb/collector.py @@ -8,6 +8,7 @@ from typing import Optional from urllib.parse import urlparse import aiokafka # type: ignore +from aiokafka.helpers import create_ssl_context # type: ignore import requests from requests import exceptions as rqexc @@ -86,9 +87,18 @@ class Producer(Service): event_loop: asyncio.AbstractEventLoop, queue: asyncio.Queue): super().__init__(config, logger, event_loop, queue) + context = create_ssl_context( + cafile=self.config.kafka.cafile, + certfile=self.config.kafka.cert, + keyfile=self.config.kafka.key, + password=self.config.kafka.passwd, + ) self.producer = aiokafka.AIOKafkaProducer( loop=self.loop, - bootstrap_servers=self.config.kafka.servers) + bootstrap_servers=self.config.kafka.servers, + security_protocol="SSL", + ssl_context=context, + ) async def produce(self): """ @@ -96,6 +106,7 @@ class Producer(Service): that reads from the queue and sends the messages to the topic defined in the config. """ + self.logger.info(self.config) await self.producer.start() try: while True: diff --git a/src/chweb/config.py b/src/chweb/config.py index e9521ba..b26146f 100644 --- a/src/chweb/config.py +++ b/src/chweb/config.py @@ -39,6 +39,10 @@ def create_config(conf: Dict[str, Any]): kafka_servers = kafka_servers_env.split(',') kafka_topic = os.getenv('KAFKA_TOPIC') + kafka_cafile = os.getenv('KAFKA_CA_FILE') + kafka_cert = os.getenv('KAFKA_CERT') + kafka_key = os.getenv('KAFKA_KEY') + kafka_pass = os.getenv('KAFKA_PASS') pg_db = os.getenv('POSTGRES_DB') pg_host = os.getenv('POSTGRES_HOST') @@ -50,6 +54,11 @@ def create_config(conf: Dict[str, Any]): config.kafka.servers = (kafka_servers if kafka_servers_env else config.kafka.servers) config.kafka.topic = kafka_topic or config.kafka.topic + config.kafka.cafile = kafka_cafile or config.kafka.cafile + config.kafka.cert = kafka_topic or config.kafka.cert + config.kafka.key = kafka_key or config.kafka.key + config.kafka.passwd = kafka_pass or config.kafka.passwd + config.postgres.dbhost = pg_host or config.postgres.dbhost config.postgres.dbname = pg_db or config.postgres.dbname config.postgres.dbport = (int(pg_port) if pg_port is not None diff --git a/src/chweb/consumer.py b/src/chweb/consumer.py index 7da5bdb..2009634 100644 --- a/src/chweb/consumer.py +++ b/src/chweb/consumer.py @@ -4,9 +4,11 @@ Sample consumer. import asyncio import json import logging +import ssl from typing import Optional import aiokafka # type: ignore +from aiokafka.helpers import create_ssl_context # type: ignore import asyncpg # type: ignore from chweb.base import Service @@ -26,10 +28,19 @@ class Consumer(Service): queue: asyncio.Queue): super().__init__(config, logger, event_loop, queue) self.db = Db(self.loop, self.logger, self.config.postgres) + context = create_ssl_context( + cafile=self.config.kafka.cafile, + certfile=self.config.kafka.cert, + keyfile=self.config.kafka.key, + password=self.config.kafka.passwd, + ) self.consumer = aiokafka.AIOKafkaConsumer( self.config.kafka.topic, loop=self.loop, - bootstrap_servers=self.config.kafka.servers) + bootstrap_servers=self.config.kafka.servers, + security_protocol="SSL", + ssl_context=context, + ) async def consume(self): """ @@ -46,7 +57,7 @@ class Consumer(Service): self.logger.debug(check) await self.db.save(check) except Exception as exc: - self.logger.error(exc) + self.logger.exception(exc) self.logger.info("Exiting due to previous errors!") finally: await self.consumer.stop() @@ -80,6 +91,7 @@ class Db: password=self.conf.dbpass, database=self.conf.dbname, loop=self.loop, timeout=60, + ssl=ssl.create_default_context(cafile=self.conf.dbcert), ) await self.conn.execute(''' CREATE TABLE IF NOT EXISTS statuses( diff --git a/src/chweb/models.py b/src/chweb/models.py index 57efbc2..3a18018 100644 --- a/src/chweb/models.py +++ b/src/chweb/models.py @@ -31,8 +31,12 @@ class KafkaConfig(BaseModel): """ Kafka broker configuration. """ - servers: List[str] = ["localhost:9992"] - topic: str = "sample" + servers: List[str] = [] + topic: str + cafile: str + cert: str + key: str + passwd: str class PostgresConfig(BaseModel): @@ -44,6 +48,7 @@ class PostgresConfig(BaseModel): dbname: str = "chweb" dbuser: str = "vladan" dbpass: str = "" + dbcert: str = "" class SiteConfig(BaseModel): @@ -60,6 +65,6 @@ class Config(BaseModel): Main application configuration. Same for the checker and the kafka consumer / postgres writer for simplicity while deploying. """ - kafka: KafkaConfig = KafkaConfig() - postgres: PostgresConfig = PostgresConfig() + kafka: KafkaConfig + postgres: PostgresConfig sites: List[SiteConfig] = []