import json import time import requests import pika from .config import HOST, PORT, RTMQ_USERNAME, RTMQ_PASSWORD, RTMQ_HOST, RTMQ_PORT, RTMQ_VHOST class CallbackClient: """ Класс отвечает за получение информации о решении капчи с call-back сервера """ def __init__(self, task_id: int, queue_name: str = None, call_type: str = 'cache'): """ :param task_id: ID полученное при создании задания в сервисе AntiCaptcha :param queue_name: Название очереди выбранное и переданное последним параметров в URL для `pingback`. Если передан параметр `call_type=queue` то поле это обязательное. :param call_type: Ресурс к которому будут отправлятся запросы: `cache` или `queue` """ # заполняем данные IP/PORT self.host = HOST self.port = PORT # ID задания self.task_id = int(task_id) # тип запросов к серверу self.call_type = call_type if self.call_type in ('cache', 'queue'): if self.call_type == 'queue': if queue_name: self.queue_name = queue_name # заполянем данные для подключения к очереди self.rtmq_username = RTMQ_USERNAME self.rtmq_password = RTMQ_PASSWORD self.rtmq_host = RTMQ_HOST self.rtmq_port = RTMQ_PORT self.rtmq_vhost = RTMQ_VHOST else: raise ValueError('\nВыбран тип получения решения `call_type=queue`, но не передан параметр названия очереди - `queue_name`. ' f'\n\tПередайте параметр `queue_name` или измените `call_type=queue` на `call_type=cache`'\ '\nYou select `call_type=queue` but don`t set `queue_name` param.' f'\n\tSet `queue_name` param or change `call_type=queue` to `call_type=cache`') else: raise ValueError('\nПередан неверный формат для запросов к callback серверу. ' f'\n\tВозможные варинты: `cache` или `queue`. Вы передали - `{self.call_type}`' '\nWrong `call_type` parameter. Valid params: `cache` or `queue`.' f'\n\tYour param - `{self.call_type}`') def __handle_queue_message(self, requests_timeout: int): """ Метод отвечает за подключение к RabbitMQ очереди и ожидания сообщения с нужным `task_id` :param requests_timeout: Время между запросами к серверу. """ # кол-во попыток на получение результата attempts = 20 # подключаемся к RabbitMQ и устанавливаем осединение + канал parameters = pika.URLParameters(f'amqp://{self.rtmq_username}:{self.rtmq_password}@{self.rtmq_host}:{self.rtmq_port}/{self.rtmq_vhost}') connection = pika.BlockingConnection(parameters=parameters) channel = connection.channel() while attempts>0: # получение сообщения из очереди method_frame, header_frame, body = channel.basic_get(self.queue_name) if body: # декодируем сообщение из bytes в JSON json_body = json.loads(body.decode()) # если ID задания сообщения из очереди совпадает с ID требуемого задания - возвращаем его. # если ID не совпадают - ожидаем дальше if int(json_body.get('taskId')) == self.task_id: channel.basic_ack(method_frame.delivery_tag) connection.close() return json_body # ставим небольшую задержку что бы не спамить сервер rabbitmq time.sleep(requests_timeout) # уменьшаем счётчик попыток attempts -= 1 return False def __handle_cache_message(self, requests_timeout: int): """ Метод отвечает за подключение к серверу, передачу `task_id` и получение резульатата решения капчи из кеша :param requests_timeout: Время между запросами к серверу. """ # кол-во попыток на получение результата attempts = 20 # создание сессии session = requests.Session() while attempts>0: with session.get(f'http://{self.host}:{self.port}/anticaptcha/cache/{self.task_id}') as resp: json_body = resp.json() # если получен результат решения капчи, а не информация об отсутсвии решения if json_body.get('status') != 'processing': return json_body else: # ставим небольшую задержку что бы не спамить сервер rabbitmq time.sleep(requests_timeout) # уменьшаем счётчик попыток attempts -= 1 return False def captcha_handler(self, requests_timeout: int = 1, auth_params: dict = None): """ Метод отвечает за получение результата решения капчи с callback сервера :param requests_timeout: Время между запросами к серверу. :param auth_params: передаются параметры в формате JSON для подключения к удалённому серверу: { 'host': '85.255.8.26', 'port': '8001', 'rtmq_username': 'hardworker_1', 'rtmq_password': 'password', 'rtmq_host': '85.255.8.26', 'rtmq_port': '5672', 'rtmq_vhost': 'anticaptcha_vhost', } :return: JSON с решением капчи. Формат - {'id':, 'code':} """ # если переданы кастомные параметры для подключения к серверу или очереди RabbitMQ if auth_params: # кастомные параметры для подключения к серверу self.host = auth_params['host'] if auth_params.get('host') else self.host self.port = auth_params['port'] if auth_params.get('port') else self.port # кастомные параметры для подключения к очереди if self.call_type == 'queue': self.rtmq_username = auth_params['rtmq_username'] if auth_params.get('rtmq_username') else self.rtmq_username self.rtmq_password = auth_params['rtmq_password'] if auth_params.get('rtmq_password') else self.rtmq_password self.rtmq_host = auth_params['rtmq_host'] if auth_params.get('rtmq_host') else self.rtmq_host self.rtmq_port = auth_params['rtmq_port'] if auth_params.get('rtmq_port') else self.rtmq_port self.rtmq_vhost = auth_params['rtmq_vhost'] if auth_params.get('rtmq_vhost') else self.rtmq_vhost # получение данных из кеша if self.call_type == 'cache': result = self.__handle_cache_message(requests_timeout=requests_timeout) # получение данных из очереди else: result = self.__handle_queue_message(requests_timeout=requests_timeout) # если результат не был получен if not result: result = {'taskId': self.task_id, 'message': {'taskId': self.task_id, 'status': 'processing'}} return result