2020-09-06 00:23:34 +02:00
|
|
|
import asyncio
|
|
|
|
|
|
|
|
import aiokafka
|
2020-09-06 03:18:52 +02:00
|
|
|
from mock import Mock, AsyncMock
|
2020-09-06 00:23:34 +02:00
|
|
|
import pytest
|
|
|
|
|
|
|
|
from chweb.collector import Producer
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
2020-09-06 03:18:52 +02:00
|
|
|
async def test_producer_called(check, config, event_loop):
|
2020-09-06 00:23:34 +02:00
|
|
|
queue = asyncio.Queue()
|
|
|
|
producer = Producer(config, Mock(), event_loop, queue)
|
|
|
|
await queue.put(check)
|
|
|
|
|
2020-09-06 03:18:52 +02:00
|
|
|
producer.producer = AsyncMock()
|
2020-09-06 00:23:34 +02:00
|
|
|
|
2020-09-06 01:58:38 +02:00
|
|
|
task = event_loop.create_task(producer())
|
2020-09-06 00:23:34 +02:00
|
|
|
await asyncio.sleep(0)
|
|
|
|
producer.producer.send_and_wait.assert_called_with(
|
|
|
|
config.kafka.topic, bytes(check.json().encode('utf-8')))
|
|
|
|
task.cancel()
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
|
|
async def test_producer_called_invalid(config, event_loop):
|
|
|
|
queue = asyncio.Queue()
|
|
|
|
producer = Producer(config, Mock(), event_loop, queue)
|
|
|
|
await queue.put('')
|
|
|
|
|
2020-09-06 03:18:52 +02:00
|
|
|
producer.producer = AsyncMock()
|
2020-09-06 00:23:34 +02:00
|
|
|
|
2020-09-06 01:58:38 +02:00
|
|
|
task = event_loop.create_task(producer())
|
2020-09-06 00:23:34 +02:00
|
|
|
await asyncio.sleep(0)
|
|
|
|
producer.logger.error.assert_called()
|
|
|
|
assert task.done()
|