Update config and SSL, i.e. make it work on aiven

This commit is contained in:
Vladan Popovic 2020-09-06 07:44:31 +02:00
parent 189681c5fa
commit 2dddf82886
5 changed files with 55 additions and 13 deletions

View file

@ -1,13 +1,18 @@
kafka: kafka:
servers: servers:
- "localhost:9992" - "kafka-f7ae38e-vladanovic-4654.aivencloud.com:23702"
topic: "sample" topic: "sitestats"
cafile: "./certs/ca.pem"
cert: "./certs/service.cert"
key: "./certs/service.key"
passwd: ""
postgres: postgres:
dbhost: "localhost" dbhost: "pg-2e0f365c-vladanovic-4654.aivencloud.com"
dbport: 5432 dbport: 23700
dbname: "chweb" dbname: "defaultdb"
dbuser: "vladan" dbuser: "avnadmin"
dbpass: "" dbpass: ""
dbcert: "./certs/pg.pem"
sites: sites:
- url: "https://dsadakjhkjsahkjh.com" - url: "https://dsadakjhkjsahkjh.com"
regex: "domain" regex: "domain"

View file

@ -8,6 +8,7 @@ from typing import Optional
from urllib.parse import urlparse from urllib.parse import urlparse
import aiokafka # type: ignore import aiokafka # type: ignore
from aiokafka.helpers import create_ssl_context # type: ignore
import requests import requests
from requests import exceptions as rqexc from requests import exceptions as rqexc
@ -86,9 +87,18 @@ class Producer(Service):
event_loop: asyncio.AbstractEventLoop, event_loop: asyncio.AbstractEventLoop,
queue: asyncio.Queue): queue: asyncio.Queue):
super().__init__(config, logger, event_loop, queue) super().__init__(config, logger, event_loop, queue)
context = create_ssl_context(
cafile=self.config.kafka.cafile,
certfile=self.config.kafka.cert,
keyfile=self.config.kafka.key,
password=self.config.kafka.passwd,
)
self.producer = aiokafka.AIOKafkaProducer( self.producer = aiokafka.AIOKafkaProducer(
loop=self.loop, loop=self.loop,
bootstrap_servers=self.config.kafka.servers) bootstrap_servers=self.config.kafka.servers,
security_protocol="SSL",
ssl_context=context,
)
async def produce(self): async def produce(self):
""" """
@ -96,6 +106,7 @@ class Producer(Service):
that reads from the queue and sends the messages to the topic defined that reads from the queue and sends the messages to the topic defined
in the config. in the config.
""" """
self.logger.info(self.config)
await self.producer.start() await self.producer.start()
try: try:
while True: while True:

View file

@ -39,6 +39,10 @@ def create_config(conf: Dict[str, Any]):
kafka_servers = kafka_servers_env.split(',') kafka_servers = kafka_servers_env.split(',')
kafka_topic = os.getenv('KAFKA_TOPIC') kafka_topic = os.getenv('KAFKA_TOPIC')
kafka_cafile = os.getenv('KAFKA_CA_FILE')
kafka_cert = os.getenv('KAFKA_CERT')
kafka_key = os.getenv('KAFKA_KEY')
kafka_pass = os.getenv('KAFKA_PASS')
pg_db = os.getenv('POSTGRES_DB') pg_db = os.getenv('POSTGRES_DB')
pg_host = os.getenv('POSTGRES_HOST') pg_host = os.getenv('POSTGRES_HOST')
@ -50,6 +54,11 @@ def create_config(conf: Dict[str, Any]):
config.kafka.servers = (kafka_servers if kafka_servers_env config.kafka.servers = (kafka_servers if kafka_servers_env
else config.kafka.servers) else config.kafka.servers)
config.kafka.topic = kafka_topic or config.kafka.topic config.kafka.topic = kafka_topic or config.kafka.topic
config.kafka.cafile = kafka_cafile or config.kafka.cafile
config.kafka.cert = kafka_topic or config.kafka.cert
config.kafka.key = kafka_key or config.kafka.key
config.kafka.passwd = kafka_pass or config.kafka.passwd
config.postgres.dbhost = pg_host or config.postgres.dbhost config.postgres.dbhost = pg_host or config.postgres.dbhost
config.postgres.dbname = pg_db or config.postgres.dbname config.postgres.dbname = pg_db or config.postgres.dbname
config.postgres.dbport = (int(pg_port) if pg_port is not None config.postgres.dbport = (int(pg_port) if pg_port is not None

View file

@ -4,9 +4,11 @@ Sample consumer.
import asyncio import asyncio
import json import json
import logging import logging
import ssl
from typing import Optional from typing import Optional
import aiokafka # type: ignore import aiokafka # type: ignore
from aiokafka.helpers import create_ssl_context # type: ignore
import asyncpg # type: ignore import asyncpg # type: ignore
from chweb.base import Service from chweb.base import Service
@ -26,10 +28,19 @@ class Consumer(Service):
queue: asyncio.Queue): queue: asyncio.Queue):
super().__init__(config, logger, event_loop, queue) super().__init__(config, logger, event_loop, queue)
self.db = Db(self.loop, self.logger, self.config.postgres) self.db = Db(self.loop, self.logger, self.config.postgres)
context = create_ssl_context(
cafile=self.config.kafka.cafile,
certfile=self.config.kafka.cert,
keyfile=self.config.kafka.key,
password=self.config.kafka.passwd,
)
self.consumer = aiokafka.AIOKafkaConsumer( self.consumer = aiokafka.AIOKafkaConsumer(
self.config.kafka.topic, self.config.kafka.topic,
loop=self.loop, loop=self.loop,
bootstrap_servers=self.config.kafka.servers) bootstrap_servers=self.config.kafka.servers,
security_protocol="SSL",
ssl_context=context,
)
async def consume(self): async def consume(self):
""" """
@ -46,7 +57,7 @@ class Consumer(Service):
self.logger.debug(check) self.logger.debug(check)
await self.db.save(check) await self.db.save(check)
except Exception as exc: except Exception as exc:
self.logger.error(exc) self.logger.exception(exc)
self.logger.info("Exiting due to previous errors!") self.logger.info("Exiting due to previous errors!")
finally: finally:
await self.consumer.stop() await self.consumer.stop()
@ -80,6 +91,7 @@ class Db:
password=self.conf.dbpass, password=self.conf.dbpass,
database=self.conf.dbname, database=self.conf.dbname,
loop=self.loop, timeout=60, loop=self.loop, timeout=60,
ssl=ssl.create_default_context(cafile=self.conf.dbcert),
) )
await self.conn.execute(''' await self.conn.execute('''
CREATE TABLE IF NOT EXISTS statuses( CREATE TABLE IF NOT EXISTS statuses(

View file

@ -31,8 +31,12 @@ class KafkaConfig(BaseModel):
""" """
Kafka broker configuration. Kafka broker configuration.
""" """
servers: List[str] = ["localhost:9992"] servers: List[str] = []
topic: str = "sample" topic: str
cafile: str
cert: str
key: str
passwd: str
class PostgresConfig(BaseModel): class PostgresConfig(BaseModel):
@ -44,6 +48,7 @@ class PostgresConfig(BaseModel):
dbname: str = "chweb" dbname: str = "chweb"
dbuser: str = "vladan" dbuser: str = "vladan"
dbpass: str = "" dbpass: str = ""
dbcert: str = ""
class SiteConfig(BaseModel): class SiteConfig(BaseModel):
@ -60,6 +65,6 @@ class Config(BaseModel):
Main application configuration. Same for the checker and the kafka Main application configuration. Same for the checker and the kafka
consumer / postgres writer for simplicity while deploying. consumer / postgres writer for simplicity while deploying.
""" """
kafka: KafkaConfig = KafkaConfig() kafka: KafkaConfig
postgres: PostgresConfig = PostgresConfig() postgres: PostgresConfig
sites: List[SiteConfig] = [] sites: List[SiteConfig] = []