diff --git a/src/python3_anticaptcha/core/base.py b/src/python3_anticaptcha/core/base.py index fe4c236..c7a8ee9 100644 --- a/src/python3_anticaptcha/core/base.py +++ b/src/python3_anticaptcha/core/base.py @@ -1,30 +1,25 @@ +import os import time +import uuid +import base64 import asyncio import logging -from typing import Union, Optional +from typing import Optional from urllib import parse +from pathlib import Path import aiohttp import requests from requests.adapters import HTTPAdapter -from python3_anticaptcha.core.enum import CaptchaTypeEnm, ResponseStatusEnm -from python3_anticaptcha.core.config import ( - RETRIES, - BASE_REQUEST_URL, - GET_RESULT_POSTFIX, - CREATE_TASK_POSTFIX, - attempts_generator, -) -from python3_anticaptcha.core.serializer import ( - CreateTaskRequestSer, - CreateTaskResponseSer, - GetTaskResultRequestSer, - GetTaskResultResponseSer, -) +from .enum import SaveFormatsEnm +from .config import RETRIES, BASE_REQUEST_URL, CREATE_TASK_POSTFIX +from .serializer import CreateTaskBaseSer, CreateTaskResponseSer, GetTaskResultRequestSer, GetTaskResultResponseSer +from .result_handler import get_sync_result, get_async_result class BaseCaptcha: + NO_CAPTCHA_ERR = "You did not send any file, local link or URL." """ Basic Captcha solving class @@ -35,24 +30,16 @@ class BaseCaptcha: request_url: API address for sending requests """ - def __init__( - self, - api_key: str, - captcha_type: Union[CaptchaTypeEnm, str], - sleep_time: int = 15, - **kwargs, - ): - # validate captcha_type parameter - if captcha_type not in CaptchaTypeEnm.list_values(): - raise ValueError(f"Invalid `captcha_type` parameter set, available - {CaptchaTypeEnm.list_values()}") + def __init__(self, api_key: str, sleep_time: int = 15): self.__sleep_time = sleep_time # assign args to validator - self._params = CreateTaskRequestSer(clientKey=api_key, **locals()) + self.create_task_payload = CreateTaskBaseSer(clientKey=api_key) # `task` body for task creation payload self.task_params = {} # prepare `get task result` payload self._get_result_params = GetTaskResultRequestSer(clientKey=api_key) + self.result = GetTaskResultResponseSer() # prepare session self._session = requests.Session() @@ -66,21 +53,27 @@ class BaseCaptcha: def _processing_captcha(self) -> dict: # added task params to payload - self._params.task = self.task_params + self.create_task_payload.task.update(self.task_params) created_task = self._create_task() if created_task.errorId == 0: self._get_result_params.taskId = created_task.taskId - return self._get_result().dict() - return created_task.dict() + else: + return created_task.to_dict() + + time.sleep(self.__sleep_time) + + return get_sync_result(result_payload=self._get_result_params, sleep_time=self.__sleep_time) def _create_task(self, url_postfix: str = CREATE_TASK_POSTFIX) -> CreateTaskResponseSer: """ Function send SYNC request to service and wait for result """ try: - resp = self._session.post(parse.urljoin(BASE_REQUEST_URL, url_postfix), json=self._params.dict()) + resp = self._session.post( + parse.urljoin(BASE_REQUEST_URL, url_postfix), json=self.create_task_payload.to_dict() + ) if resp.status_code == 200: return CreateTaskResponseSer(**resp.json()) else: @@ -89,37 +82,6 @@ class BaseCaptcha: logging.exception(error) raise - def _get_result(self) -> GetTaskResultResponseSer: - """ - Method send SYNC `getTaskResult` request to service and wait for result - """ - # initial waiting - time.sleep(self.__sleep_time) - - attempts = attempts_generator() - for _ in attempts: - try: - task_result_response = self._session.post( - parse.urljoin(BASE_REQUEST_URL, GET_RESULT_POSTFIX), json=self._get_result_params.dict() - ) - if task_result_response.status_code == 200: - task_result_data = GetTaskResultResponseSer(**task_result_response.json()) - - if task_result_data.errorId == 0: - if task_result_data.status == ResponseStatusEnm.processing: - time.sleep(self.__sleep_time) - else: - task_result_data.taskId = self._get_result_params.taskId - return task_result_data - else: - task_result_data.taskId = self._get_result_params.taskId - return task_result_data - else: - raise ValueError(task_result_response.raise_for_status()) - except Exception as error: - logging.exception(error) - raise - @staticmethod def _send_post_request( payload: Optional[dict] = None, @@ -139,21 +101,91 @@ class BaseCaptcha: logging.exception(error) raise + def url_open(self, url: str, **kwargs): + """ + Method open links + """ + return self._session.get(url=url, **kwargs) + + @staticmethod + def _local_file_captcha(captcha_file: str): + """ + Method get local file, read it and prepare for sending to Captcha solving service + """ + with open(captcha_file, "rb") as file: + return file.read() + + def _file_const_saver(self, content: bytes, file_path: str, file_extension: str = "png"): + """ + Method create and save file in folder + """ + Path(file_path).mkdir(parents=True, exist_ok=True) + + # generate image name + self._file_name = f"file-{uuid.uuid4()}.{file_extension}" + + # save image to folder + with open(os.path.join(file_path, self._file_name), "wb") as out_image: + out_image.write(content) + + def _body_file_processing( + self, + save_format: SaveFormatsEnm, + file_path: str, + file_extension: str = "png", + captcha_link: Optional[str] = None, + captcha_file: Optional[str] = None, + captcha_base64: Optional[bytes] = None, + **kwargs, + ): + # if a local file link is passed + if captcha_file: + self.create_task_payload.task.update( + {"body": base64.b64encode(self._local_file_captcha(captcha_file)).decode("utf-8")} + ) + # if the file is transferred in base64 encoding + elif captcha_base64: + self.create_task_payload.task.update({"body": base64.b64encode(captcha_base64).decode("utf-8")}) + # if a URL is passed + elif captcha_link: + try: + content = self.url_open(url=captcha_link, **kwargs).content + logging.warning(f"{save_format = }") + logging.warning(f"{save_format == SaveFormatsEnm.CONST.value = }") + logging.warning(f"{save_format == SaveFormatsEnm.CONST = }") + # according to the value of the passed parameter, select the function to save the image + if save_format == SaveFormatsEnm.CONST.value: + logging.warning(f"TRY TO CREATE FOLDER WITH FILE") + self._file_const_saver(content, file_path, file_extension=file_extension) + logging.warning(f"TRY TO CREATE FOLDER") + self.create_task_payload.task.update({"body": base64.b64encode(content).decode("utf-8")}) + except Exception as error: + self.result.errorId = 12 + self.result.errorCode = self.NO_CAPTCHA_ERR + self.result.errorDescription = str(error) + + else: + self.result.errorId = 12 + self.result.errorCode = self.NO_CAPTCHA_ERR + """ Async part """ async def _aio_processing_captcha(self) -> dict: # added task params to payload - self._params.task = self.task_params + self.create_task_payload.task = self.task_params created_task = await self._aio_create_task() if created_task.errorId == 0: self._get_result_params.taskId = created_task.taskId - result = await self._aio_get_result() - return result.dict() - return created_task.dict() + else: + return created_task.to_dict() + + await asyncio.sleep(self.__sleep_time) + + return await get_async_result(result_payload=self._get_result_params.to_dict(), sleep_time=self.__sleep_time) async def _aio_create_task(self, url_postfix: str = CREATE_TASK_POSTFIX) -> CreateTaskResponseSer: """ @@ -161,7 +193,9 @@ class BaseCaptcha: """ async with aiohttp.ClientSession() as session: try: - async with session.post(parse.urljoin(BASE_REQUEST_URL, url_postfix), json=self._params.dict()) as resp: + async with session.post( + parse.urljoin(BASE_REQUEST_URL, url_postfix), json=self.create_task_payload.to_dict() + ) as resp: if resp.status == 200: return CreateTaskResponseSer(**await resp.json()) else: @@ -170,39 +204,6 @@ class BaseCaptcha: logging.exception(error) raise - async def _aio_get_result(self) -> GetTaskResultResponseSer: - """ - Method send SYNC `getTaskResult` request to service and wait for result - """ - # initial waiting - await asyncio.sleep(self.__sleep_time) - - attempts = attempts_generator() - async with aiohttp.ClientSession() as session: - for _ in attempts: - try: - async with session.post( - parse.urljoin(BASE_REQUEST_URL, GET_RESULT_POSTFIX), - json=self._get_result_params.dict(), - ) as task_result_response: - if task_result_response.status == 200: - task_result_data = GetTaskResultResponseSer(**await task_result_response.json()) - - if task_result_data.errorId == 0: - if task_result_data.status == ResponseStatusEnm.processing: - time.sleep(self.__sleep_time) - else: - task_result_data.taskId = self._get_result_params.taskId - return task_result_data - else: - task_result_data.taskId = self._get_result_params.taskId - return task_result_data - else: - raise ValueError(task_result_response.raise_for_status()) - except Exception as error: - logging.exception(error) - raise - @staticmethod async def _aio_send_post_request(payload: Optional[dict] = None, url_postfix: str = CREATE_TASK_POSTFIX) -> dict: """