Update base.py

This commit is contained in:
Andrei 2024-10-05 21:11:31 +03:00
parent d27858955c
commit 32d95c11c5
1 changed files with 100 additions and 99 deletions

View File

@ -1,30 +1,25 @@
import os
import time
import uuid
import base64
import asyncio
import logging
from typing import Union, Optional
from typing import Optional
from urllib import parse
from pathlib import Path
import aiohttp
import requests
from requests.adapters import HTTPAdapter
from python3_anticaptcha.core.enum import CaptchaTypeEnm, ResponseStatusEnm
from python3_anticaptcha.core.config import (
RETRIES,
BASE_REQUEST_URL,
GET_RESULT_POSTFIX,
CREATE_TASK_POSTFIX,
attempts_generator,
)
from python3_anticaptcha.core.serializer import (
CreateTaskRequestSer,
CreateTaskResponseSer,
GetTaskResultRequestSer,
GetTaskResultResponseSer,
)
from .enum import SaveFormatsEnm
from .config import RETRIES, BASE_REQUEST_URL, CREATE_TASK_POSTFIX
from .serializer import CreateTaskBaseSer, CreateTaskResponseSer, GetTaskResultRequestSer, GetTaskResultResponseSer
from .result_handler import get_sync_result, get_async_result
class BaseCaptcha:
NO_CAPTCHA_ERR = "You did not send any file, local link or URL."
"""
Basic Captcha solving class
@ -35,24 +30,16 @@ class BaseCaptcha:
request_url: API address for sending requests
"""
def __init__(
self,
api_key: str,
captcha_type: Union[CaptchaTypeEnm, str],
sleep_time: int = 15,
**kwargs,
):
# validate captcha_type parameter
if captcha_type not in CaptchaTypeEnm.list_values():
raise ValueError(f"Invalid `captcha_type` parameter set, available - {CaptchaTypeEnm.list_values()}")
def __init__(self, api_key: str, sleep_time: int = 15):
self.__sleep_time = sleep_time
# assign args to validator
self._params = CreateTaskRequestSer(clientKey=api_key, **locals())
self.create_task_payload = CreateTaskBaseSer(clientKey=api_key)
# `task` body for task creation payload
self.task_params = {}
# prepare `get task result` payload
self._get_result_params = GetTaskResultRequestSer(clientKey=api_key)
self.result = GetTaskResultResponseSer()
# prepare session
self._session = requests.Session()
@ -66,21 +53,27 @@ class BaseCaptcha:
def _processing_captcha(self) -> dict:
# added task params to payload
self._params.task = self.task_params
self.create_task_payload.task.update(self.task_params)
created_task = self._create_task()
if created_task.errorId == 0:
self._get_result_params.taskId = created_task.taskId
return self._get_result().dict()
return created_task.dict()
else:
return created_task.to_dict()
time.sleep(self.__sleep_time)
return get_sync_result(result_payload=self._get_result_params, sleep_time=self.__sleep_time)
def _create_task(self, url_postfix: str = CREATE_TASK_POSTFIX) -> CreateTaskResponseSer:
"""
Function send SYNC request to service and wait for result
"""
try:
resp = self._session.post(parse.urljoin(BASE_REQUEST_URL, url_postfix), json=self._params.dict())
resp = self._session.post(
parse.urljoin(BASE_REQUEST_URL, url_postfix), json=self.create_task_payload.to_dict()
)
if resp.status_code == 200:
return CreateTaskResponseSer(**resp.json())
else:
@ -89,37 +82,6 @@ class BaseCaptcha:
logging.exception(error)
raise
def _get_result(self) -> GetTaskResultResponseSer:
"""
Method send SYNC `getTaskResult` request to service and wait for result
"""
# initial waiting
time.sleep(self.__sleep_time)
attempts = attempts_generator()
for _ in attempts:
try:
task_result_response = self._session.post(
parse.urljoin(BASE_REQUEST_URL, GET_RESULT_POSTFIX), json=self._get_result_params.dict()
)
if task_result_response.status_code == 200:
task_result_data = GetTaskResultResponseSer(**task_result_response.json())
if task_result_data.errorId == 0:
if task_result_data.status == ResponseStatusEnm.processing:
time.sleep(self.__sleep_time)
else:
task_result_data.taskId = self._get_result_params.taskId
return task_result_data
else:
task_result_data.taskId = self._get_result_params.taskId
return task_result_data
else:
raise ValueError(task_result_response.raise_for_status())
except Exception as error:
logging.exception(error)
raise
@staticmethod
def _send_post_request(
payload: Optional[dict] = None,
@ -139,21 +101,91 @@ class BaseCaptcha:
logging.exception(error)
raise
def url_open(self, url: str, **kwargs):
"""
Method open links
"""
return self._session.get(url=url, **kwargs)
@staticmethod
def _local_file_captcha(captcha_file: str):
"""
Method get local file, read it and prepare for sending to Captcha solving service
"""
with open(captcha_file, "rb") as file:
return file.read()
def _file_const_saver(self, content: bytes, file_path: str, file_extension: str = "png"):
"""
Method create and save file in folder
"""
Path(file_path).mkdir(parents=True, exist_ok=True)
# generate image name
self._file_name = f"file-{uuid.uuid4()}.{file_extension}"
# save image to folder
with open(os.path.join(file_path, self._file_name), "wb") as out_image:
out_image.write(content)
def _body_file_processing(
self,
save_format: SaveFormatsEnm,
file_path: str,
file_extension: str = "png",
captcha_link: Optional[str] = None,
captcha_file: Optional[str] = None,
captcha_base64: Optional[bytes] = None,
**kwargs,
):
# if a local file link is passed
if captcha_file:
self.create_task_payload.task.update(
{"body": base64.b64encode(self._local_file_captcha(captcha_file)).decode("utf-8")}
)
# if the file is transferred in base64 encoding
elif captcha_base64:
self.create_task_payload.task.update({"body": base64.b64encode(captcha_base64).decode("utf-8")})
# if a URL is passed
elif captcha_link:
try:
content = self.url_open(url=captcha_link, **kwargs).content
logging.warning(f"{save_format = }")
logging.warning(f"{save_format == SaveFormatsEnm.CONST.value = }")
logging.warning(f"{save_format == SaveFormatsEnm.CONST = }")
# according to the value of the passed parameter, select the function to save the image
if save_format == SaveFormatsEnm.CONST.value:
logging.warning(f"TRY TO CREATE FOLDER WITH FILE")
self._file_const_saver(content, file_path, file_extension=file_extension)
logging.warning(f"TRY TO CREATE FOLDER")
self.create_task_payload.task.update({"body": base64.b64encode(content).decode("utf-8")})
except Exception as error:
self.result.errorId = 12
self.result.errorCode = self.NO_CAPTCHA_ERR
self.result.errorDescription = str(error)
else:
self.result.errorId = 12
self.result.errorCode = self.NO_CAPTCHA_ERR
"""
Async part
"""
async def _aio_processing_captcha(self) -> dict:
# added task params to payload
self._params.task = self.task_params
self.create_task_payload.task = self.task_params
created_task = await self._aio_create_task()
if created_task.errorId == 0:
self._get_result_params.taskId = created_task.taskId
result = await self._aio_get_result()
return result.dict()
return created_task.dict()
else:
return created_task.to_dict()
await asyncio.sleep(self.__sleep_time)
return await get_async_result(result_payload=self._get_result_params.to_dict(), sleep_time=self.__sleep_time)
async def _aio_create_task(self, url_postfix: str = CREATE_TASK_POSTFIX) -> CreateTaskResponseSer:
"""
@ -161,7 +193,9 @@ class BaseCaptcha:
"""
async with aiohttp.ClientSession() as session:
try:
async with session.post(parse.urljoin(BASE_REQUEST_URL, url_postfix), json=self._params.dict()) as resp:
async with session.post(
parse.urljoin(BASE_REQUEST_URL, url_postfix), json=self.create_task_payload.to_dict()
) as resp:
if resp.status == 200:
return CreateTaskResponseSer(**await resp.json())
else:
@ -170,39 +204,6 @@ class BaseCaptcha:
logging.exception(error)
raise
async def _aio_get_result(self) -> GetTaskResultResponseSer:
"""
Method send SYNC `getTaskResult` request to service and wait for result
"""
# initial waiting
await asyncio.sleep(self.__sleep_time)
attempts = attempts_generator()
async with aiohttp.ClientSession() as session:
for _ in attempts:
try:
async with session.post(
parse.urljoin(BASE_REQUEST_URL, GET_RESULT_POSTFIX),
json=self._get_result_params.dict(),
) as task_result_response:
if task_result_response.status == 200:
task_result_data = GetTaskResultResponseSer(**await task_result_response.json())
if task_result_data.errorId == 0:
if task_result_data.status == ResponseStatusEnm.processing:
time.sleep(self.__sleep_time)
else:
task_result_data.taskId = self._get_result_params.taskId
return task_result_data
else:
task_result_data.taskId = self._get_result_params.taskId
return task_result_data
else:
raise ValueError(task_result_response.raise_for_status())
except Exception as error:
logging.exception(error)
raise
@staticmethod
async def _aio_send_post_request(payload: Optional[dict] = None, url_postfix: str = CREATE_TASK_POSTFIX) -> dict:
"""