From c4cb6e0ac36784aab3805b7e0a1b3f83083e5b2a Mon Sep 17 00:00:00 2001 From: Thephaseless Date: Sun, 24 Nov 2024 23:04:19 +0000 Subject: [PATCH] add title parsing --- main.py | 19 ++++++++++++-- poetry.lock | 2 +- pyproject.toml | 1 + src/models/requests.py | 58 ++++++++++++++---------------------------- src/utils/consts.py | 7 +++++ 5 files changed, 45 insertions(+), 42 deletions(-) diff --git a/main.py b/main.py index a2aeac6..be19e25 100644 --- a/main.py +++ b/main.py @@ -4,10 +4,14 @@ import logging import time import uvicorn.config -from fastapi import FastAPI +from bs4 import BeautifulSoup +from fastapi import FastAPI, HTTPException from fastapi.responses import RedirectResponse from sbase import SB, BaseCase +import src +import src.utils +import src.utils.consts from src.models.requests import LinkRequest, LinkResponse, Solution from src.utils import logger from src.utils.consts import LOG_LEVEL @@ -49,6 +53,17 @@ def read_item(request: LinkRequest): sb.save_screenshot("screenshot.png") logger.info(f"Got webpage: {request.url}") + source = sb.get_page_source() + source_bs = BeautifulSoup(source, "html.parser") + title_tag = source_bs.title + if title_tag is None: + raise HTTPException(status_code=500, detail="Title tag not found") + + if title_tag.string in src.utils.consts.CHALLENGE_TITLES: + raise HTTPException(status_code=500, detail="Could not bypass challenge") + + title = title_tag.string + logger.info(f"Title: {title}") response = LinkResponse( message="Success", solution=Solution( @@ -57,7 +72,7 @@ def read_item(request: LinkRequest): status=200, cookies=sb.get_cookies(), headers={}, - response=sb.get_page_source(), + response=source, ), startTimestamp=start_time, ) diff --git a/poetry.lock b/poetry.lock index 0a82b58..95896a0 100644 --- a/poetry.lock +++ b/poetry.lock @@ -2165,4 +2165,4 @@ h11 = ">=0.9.0,<1" [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "b2b7fe1981a31791499292ae043fc0da7faa15b354d5faa8668f793f3ece58f3" +content-hash = "1dcc6c3a9ff83a4e27c96b1047a388e1ddd9a7c326b20ef07793c0721f9594dc" diff --git a/pyproject.toml b/pyproject.toml index b3eef47..c6b3e1a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ pytest-asyncio = "^0" ruff = "^0.8.0" seleniumbase = "^4.32.12" pyautogui = "^0.9.54" +beautifulsoup4 = "^4.12.3" [build-system] diff --git a/src/models/requests.py b/src/models/requests.py index c6daad1..1b83ab6 100644 --- a/src/models/requests.py +++ b/src/models/requests.py @@ -1,11 +1,10 @@ from __future__ import annotations -import re import time +from http import HTTPStatus from typing import Any from pydantic import BaseModel -from seleniumbase.undetected.cdp_driver.tab import Tab class LinkRequest(BaseModel): @@ -26,6 +25,17 @@ class Solution(BaseModel): headers: dict[str, Any] response: str + @classmethod + def empty(cls): + return cls( + url="", + status=HTTPStatus.INTERNAL_SERVER_ERROR, + cookies=[], + userAgent="", + headers={}, + response="", + ) + class LinkResponse(BaseModel): status: str = "ok" @@ -36,46 +46,16 @@ class LinkResponse(BaseModel): version: str = "3.3.21" # TODO: Implement versioning @classmethod - async def create( - cls, - page: Tab, - start_timestamp: int, - *, - challenged: bool = False, - ): - message = "Passed challenge" if challenged else "Challenge not detected" - - user_agent = await cls.get_useragent(page) - - # cookies = await page.browser.cookies.get_all(requests_cookie_format=True) - # # Convert cookies to json - # cookies = [cookie.to_json() for cookie in cookies] - - cookies = await page.browser.cookies.get_all() - solution = Solution( - url=page.url, - status=200, - cookies=cookies if cookies else [], - userAgent=user_agent, - headers={}, - response=await page.get_content(), - ) - + def invalid(cls): return cls( - message=message, - solution=solution, - startTimestamp=start_timestamp, + status="error", + message="Invalid request", + solution=Solution.empty(), + startTimestamp=int(time.time() * 1000), + endTimestamp=int(time.time() * 1000), + version="3.3.21", ) - @classmethod - async def get_useragent(cls, page): - user_agent = await page.js_dumps("navigator") - if not isinstance(user_agent, dict): - raise ProtectionTriggeredError("User agent is not a dictionary") - user_agent = user_agent["userAgent"] - re.sub(pattern="HEADLESS", repl="", string=user_agent, flags=re.IGNORECASE) - return user_agent - class NoChromeExtensionError(Exception): """No chrome extention found.""" diff --git a/src/utils/consts.py b/src/utils/consts.py index 9763ec1..5834160 100644 --- a/src/utils/consts.py +++ b/src/utils/consts.py @@ -3,3 +3,10 @@ import os LOG_LEVEL = os.getenv("LOG_LEVEL") or "INFO" LOG_LEVEL = logging.getLevelNamesMapping()[LOG_LEVEL.upper()] + +CHALLENGE_TITLES = [ + # Cloudflare + "Just a moment...", + # DDoS-GUARD + "DDoS-Guard", +]