mirror of
https://github.com/ThePhaseless/Byparr.git
synced 2025-03-15 01:40:21 +08:00
add title parsing
This commit is contained in:
parent
c8f02c1102
commit
c4cb6e0ac3
19
main.py
19
main.py
@ -4,10 +4,14 @@ import logging
|
||||
import time
|
||||
|
||||
import uvicorn.config
|
||||
from fastapi import FastAPI
|
||||
from bs4 import BeautifulSoup
|
||||
from fastapi import FastAPI, HTTPException
|
||||
from fastapi.responses import RedirectResponse
|
||||
from sbase import SB, BaseCase
|
||||
|
||||
import src
|
||||
import src.utils
|
||||
import src.utils.consts
|
||||
from src.models.requests import LinkRequest, LinkResponse, Solution
|
||||
from src.utils import logger
|
||||
from src.utils.consts import LOG_LEVEL
|
||||
@ -49,6 +53,17 @@ def read_item(request: LinkRequest):
|
||||
sb.save_screenshot("screenshot.png")
|
||||
logger.info(f"Got webpage: {request.url}")
|
||||
|
||||
source = sb.get_page_source()
|
||||
source_bs = BeautifulSoup(source, "html.parser")
|
||||
title_tag = source_bs.title
|
||||
if title_tag is None:
|
||||
raise HTTPException(status_code=500, detail="Title tag not found")
|
||||
|
||||
if title_tag.string in src.utils.consts.CHALLENGE_TITLES:
|
||||
raise HTTPException(status_code=500, detail="Could not bypass challenge")
|
||||
|
||||
title = title_tag.string
|
||||
logger.info(f"Title: {title}")
|
||||
response = LinkResponse(
|
||||
message="Success",
|
||||
solution=Solution(
|
||||
@ -57,7 +72,7 @@ def read_item(request: LinkRequest):
|
||||
status=200,
|
||||
cookies=sb.get_cookies(),
|
||||
headers={},
|
||||
response=sb.get_page_source(),
|
||||
response=source,
|
||||
),
|
||||
startTimestamp=start_time,
|
||||
)
|
||||
|
2
poetry.lock
generated
2
poetry.lock
generated
@ -2165,4 +2165,4 @@ h11 = ">=0.9.0,<1"
|
||||
[metadata]
|
||||
lock-version = "2.0"
|
||||
python-versions = "^3.12"
|
||||
content-hash = "b2b7fe1981a31791499292ae043fc0da7faa15b354d5faa8668f793f3ece58f3"
|
||||
content-hash = "1dcc6c3a9ff83a4e27c96b1047a388e1ddd9a7c326b20ef07793c0721f9594dc"
|
||||
|
@ -16,6 +16,7 @@ pytest-asyncio = "^0"
|
||||
ruff = "^0.8.0"
|
||||
seleniumbase = "^4.32.12"
|
||||
pyautogui = "^0.9.54"
|
||||
beautifulsoup4 = "^4.12.3"
|
||||
|
||||
|
||||
[build-system]
|
||||
|
@ -1,11 +1,10 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import time
|
||||
from http import HTTPStatus
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel
|
||||
from seleniumbase.undetected.cdp_driver.tab import Tab
|
||||
|
||||
|
||||
class LinkRequest(BaseModel):
|
||||
@ -26,6 +25,17 @@ class Solution(BaseModel):
|
||||
headers: dict[str, Any]
|
||||
response: str
|
||||
|
||||
@classmethod
|
||||
def empty(cls):
|
||||
return cls(
|
||||
url="",
|
||||
status=HTTPStatus.INTERNAL_SERVER_ERROR,
|
||||
cookies=[],
|
||||
userAgent="",
|
||||
headers={},
|
||||
response="",
|
||||
)
|
||||
|
||||
|
||||
class LinkResponse(BaseModel):
|
||||
status: str = "ok"
|
||||
@ -36,46 +46,16 @@ class LinkResponse(BaseModel):
|
||||
version: str = "3.3.21" # TODO: Implement versioning
|
||||
|
||||
@classmethod
|
||||
async def create(
|
||||
cls,
|
||||
page: Tab,
|
||||
start_timestamp: int,
|
||||
*,
|
||||
challenged: bool = False,
|
||||
):
|
||||
message = "Passed challenge" if challenged else "Challenge not detected"
|
||||
|
||||
user_agent = await cls.get_useragent(page)
|
||||
|
||||
# cookies = await page.browser.cookies.get_all(requests_cookie_format=True)
|
||||
# # Convert cookies to json
|
||||
# cookies = [cookie.to_json() for cookie in cookies]
|
||||
|
||||
cookies = await page.browser.cookies.get_all()
|
||||
solution = Solution(
|
||||
url=page.url,
|
||||
status=200,
|
||||
cookies=cookies if cookies else [],
|
||||
userAgent=user_agent,
|
||||
headers={},
|
||||
response=await page.get_content(),
|
||||
)
|
||||
|
||||
def invalid(cls):
|
||||
return cls(
|
||||
message=message,
|
||||
solution=solution,
|
||||
startTimestamp=start_timestamp,
|
||||
status="error",
|
||||
message="Invalid request",
|
||||
solution=Solution.empty(),
|
||||
startTimestamp=int(time.time() * 1000),
|
||||
endTimestamp=int(time.time() * 1000),
|
||||
version="3.3.21",
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def get_useragent(cls, page):
|
||||
user_agent = await page.js_dumps("navigator")
|
||||
if not isinstance(user_agent, dict):
|
||||
raise ProtectionTriggeredError("User agent is not a dictionary")
|
||||
user_agent = user_agent["userAgent"]
|
||||
re.sub(pattern="HEADLESS", repl="", string=user_agent, flags=re.IGNORECASE)
|
||||
return user_agent
|
||||
|
||||
|
||||
class NoChromeExtensionError(Exception):
|
||||
"""No chrome extention found."""
|
||||
|
@ -3,3 +3,10 @@ import os
|
||||
|
||||
LOG_LEVEL = os.getenv("LOG_LEVEL") or "INFO"
|
||||
LOG_LEVEL = logging.getLevelNamesMapping()[LOG_LEVEL.upper()]
|
||||
|
||||
CHALLENGE_TITLES = [
|
||||
# Cloudflare
|
||||
"Just a moment...",
|
||||
# DDoS-GUARD
|
||||
"DDoS-Guard",
|
||||
]
|
||||
|
Loading…
x
Reference in New Issue
Block a user