Added integration test of failed authentication to redis

This commit is contained in:
Matus Valo 2021-11-05 23:47:06 +01:00 committed by Asif Saif Uddin
parent 8391403186
commit 34500e1e2c
1 changed files with 22 additions and 2 deletions

View File

@ -2,6 +2,7 @@ import os
from time import sleep
import pytest
import redis
import kombu
@ -9,9 +10,15 @@ from .common import BaseExchangeTypes, BasePriority, BasicFunctionality
def get_connection(
hostname, port, vhost, transport_options=None):
hostname, port, vhost, user_name=None, password=None,
transport_options=None):
credentials = f'{user_name}:{password}@' if user_name else ''
return kombu.Connection(
f'redis://{hostname}:{port}', transport_options=transport_options)
f'redis://{credentials}{hostname}:{port}',
transport_options=transport_options
)
@pytest.fixture(params=[None, {'global_keyprefix': '_prefixed_'}])
@ -32,6 +39,19 @@ def invalid_connection():
return kombu.Connection('redis://localhost:12345')
@pytest.mark.env('redis')
def test_failed_credentials():
"""Tests denied connection when wrong credentials were provided"""
with pytest.raises(redis.exceptions.ResponseError):
get_connection(
hostname=os.environ.get('REDIS_HOST', 'localhost'),
port=os.environ.get('REDIS_6379_TCP', '6379'),
vhost=None,
user_name='wrong_redis_user',
password='wrong_redis_password'
).connect()
@pytest.mark.env('redis')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_RedisBasicFunctionality(BasicFunctionality):