separate files, add gzip, save screenshots on exception, add PROXY support

This commit is contained in:
ThePhaseless 2025-02-17 23:08:14 +00:00
parent 4fedb90cf5
commit dd251174e4
15 changed files with 187 additions and 252 deletions

2
.gitignore vendored
View File

@ -165,7 +165,7 @@ cython_debug/
core
# Screenshots
*.png
screenshots/
# Downloaded files
downloaded_files/

View File

@ -27,8 +27,9 @@ An alternative to [FlareSolverr](https://github.com/FlareSolverr/FlareSolverr) a
| Env | Default | Description |
| -------------- | ---------------------- | ------------------------------------------------------------------------------------------------------------------------------- |
| `USE_XVFB` | `false` | Use virtual desktop with Xvfb. (Linux only) (Can cause performance hog [#14](https://github.com/ThePhaseless/Byparr/issues/14)) |
| `USE_HEADLESS` | `true/false on docker` | Use headless chromium. |
| `USE_XVFB` | `SeleniumBase default` | Use virtual desktop with Xvfb. (Linux only) (Can cause performance hog [#14](https://github.com/ThePhaseless/Byparr/issues/14)) |
| `USE_HEADLESS` | `SeleniumBase default` | Use headless chromium. |
| `PROXY` | `` | Proxy to use. (format: `username:password@host:port`) |
## Tags

View File

@ -6,5 +6,7 @@ services:
dockerfile: Dockerfile
environment:
- LOG_LEVEL=INFO
volumes:
- ./screenshots:/app/screenshots # For screenshots when exception occurs
ports:
- "8191:8191"

128
main.py
View File

@ -1,132 +1,20 @@
from __future__ import annotations
import logging
import time
from http import HTTPStatus
import uvicorn
from bs4 import BeautifulSoup
from fastapi import FastAPI, HTTPException
from fastapi.responses import RedirectResponse
from sbase import SB, BaseCase
from fastapi import FastAPI
from fastapi.middleware.gzip import GZipMiddleware
import src
import src.utils
import src.utils.consts
from src.models.requests import LinkRequest, LinkResponse, Solution
from src.utils import logger
from src.utils.consts import LOG_LEVEL, USE_HEADLESS, USE_XVFB
from src.consts import LOG_LEVEL
from src.endpoints import router
from src.middlewares import LogRequest
app = FastAPI(debug=LOG_LEVEL == logging.DEBUG, log_level=LOG_LEVEL)
app.add_middleware(GZipMiddleware)
app.add_middleware(LogRequest)
cookies = []
@app.get("/")
def read_root():
"""Redirect to /docs."""
logger.debug("Redirecting to /docs")
return RedirectResponse(url="/docs", status_code=301)
@app.get("/health")
async def health_check():
"""Health check endpoint."""
health_check_request = read_item(
LinkRequest.model_construct(url="https://prowlarr.servarr.com/v1/ping")
)
if health_check_request.solution.status != HTTPStatus.OK:
raise HTTPException(
status_code=500,
detail="Health check failed",
)
return {"status": "ok"}
@app.post("/v1")
def read_item(request: LinkRequest) -> LinkResponse:
"""Handle POST requests."""
start_time = int(time.time() * 1000)
# request.url = "https://nowsecure.nl"
logger.info(f"Request: {request}")
# Check is string is url
if not (request.url.startswith("http://") or request.url.startswith("https://")):
return LinkResponse.invalid(request.url)
response: LinkResponse
# start_time = int(time.time() * 1000)
with SB(
uc=True,
locale_code="en",
test=False,
ad_block=True,
xvfb=USE_XVFB,
headless=USE_HEADLESS,
) as sb:
try:
sb: BaseCase
global cookies # noqa: PLW0603
if cookies:
sb.uc_open_with_reconnect(request.url)
sb.add_cookies(cookies)
sb.uc_open_with_reconnect(request.url)
source = sb.get_page_source()
source_bs = BeautifulSoup(source, "html.parser")
title_tag = source_bs.title
logger.debug(f"Got webpage: {request.url}")
if title_tag and title_tag.string in src.utils.consts.CHALLENGE_TITLES:
logger.debug("Challenge detected")
sb.uc_gui_click_captcha()
logger.info("Clicked captcha")
source = sb.get_page_source()
source_bs = BeautifulSoup(source, "html.parser")
title_tag = source_bs.title
if title_tag and title_tag.string in src.utils.consts.CHALLENGE_TITLES:
sb.save_screenshot(f"./screenshots/{request.url}.png")
raise_captcha_bypass_error()
response = LinkResponse(
message="Success",
solution=Solution(
userAgent=sb.get_user_agent(),
url=sb.get_current_url(),
status=200,
cookies=sb.get_cookies(),
headers={},
response=source,
),
startTimestamp=start_time,
)
cookies = sb.get_cookies()
except Exception as e:
logger.error(f"Error: {e}")
if sb.driver:
sb.driver.quit()
raise HTTPException(
status_code=500, detail="Unknown error, check logs"
) from e
return response
def raise_captcha_bypass_error():
"""
Raise a 500 error if the challenge could not be bypassed.
This function should be called if the challenge is not bypassed after
clicking the captcha.
Returns:
None
"""
raise HTTPException(status_code=500, detail="Could not bypass challenge")
app.include_router(router=router)
if __name__ == "__main__":

View File

View File

@ -27,8 +27,13 @@ LOG_LEVEL = logging.getLevelNamesMapping()[LOG_LEVEL.upper()]
VERSION = get_version_from_env() or "unknown"
USE_XVFB = os.getenv("USE_XVFB", "false") in ["true", "1"]
USE_HEADLESS = os.getenv("USE_HEADLESS", "true").lower() in ["true", "1"]
USE_XVFB = os.getenv("USE_XVFB") in ["true", "1"] if os.getenv("USE_XVFB") else None
USE_HEADLESS = (
os.getenv("USE_HEADLESS") in ["true", "1"] if os.getenv("USE_HEADLESS") else None
)
PROXY = os.getenv("PROXY")
CHALLENGE_TITLES = [
# Cloudflare

78
src/endpoints.py Normal file
View File

@ -0,0 +1,78 @@
import time
from http import HTTPStatus
from typing import Annotated
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import RedirectResponse
from sbase import BaseCase
from src.consts import CHALLENGE_TITLES
from src.models import (
LinkRequest,
LinkResponse,
Solution,
)
from .utils import get_sb, logger, save_screenshot
router = APIRouter()
SeleniumDep = Annotated[BaseCase, Depends(get_sb)]
@router.get("/", include_in_schema=False)
def read_root():
"""Redirect to /docs."""
logger.debug("Redirecting to /docs")
return RedirectResponse(url="/docs", status_code=301)
@router.get("/health")
def health_check(sb: SeleniumDep):
"""Health check endpoint."""
health_check_request = read_item(
LinkRequest.model_construct(url="https://prowlarr.servarr.com/v1/ping"),
sb,
)
if health_check_request.solution.status != HTTPStatus.OK:
raise HTTPException(
status_code=500,
detail="Health check failed",
)
return {"status": "ok"}
@router.post("/v1")
def read_item(request: LinkRequest, sb: SeleniumDep) -> LinkResponse:
"""Handle POST requests."""
start_time = int(time.time() * 1000)
sb.uc_open_with_reconnect(request.url)
logger.debug(f"Got webpage: {request.url}")
source_bs = sb.get_beautiful_soup()
title_tag = source_bs.title
if title_tag and title_tag.string in CHALLENGE_TITLES:
logger.debug("Challenge detected")
sb.uc_gui_click_captcha()
logger.info("Clicked captcha")
source_bs = sb.get_beautiful_soup()
title_tag = source_bs.title
if title_tag and title_tag.string in CHALLENGE_TITLES:
save_screenshot(sb)
raise HTTPException(status_code=500, detail="Could not bypass challenge")
return LinkResponse(
message="Success",
solution=Solution(
userAgent=sb.get_user_agent(),
url=sb.get_current_url(),
status=200,
cookies=sb.get_cookies(),
headers={},
response=str(source_bs),
),
start_timestamp=start_time,
)

24
src/middlewares.py Normal file
View File

@ -0,0 +1,24 @@
import time
from starlette.middleware.base import BaseHTTPMiddleware
from src.models import LinkRequest
from src.utils import logger
class LogRequest(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
"""Log requests."""
if request.url.path != "/v1":
return await call_next(request)
start_time = time.perf_counter()
request_body = LinkRequest.model_validate(await request.json())
logger.info(
f"From: {request.client.host if request.client else 'unknown'} at {time.strftime('%Y-%m-%d %H:%M:%S')}: {request_body.url}"
)
response = await call_next(request)
process_time = time.perf_counter() - start_time
logger.info(f"Done {request_body.url} in {process_time:.2f}s")
return response

View File

@ -2,21 +2,24 @@ from __future__ import annotations
import time
from http import HTTPStatus
from typing import Any
from typing import Annotated, Any
from pydantic import BaseModel, Field
from fastapi import Body
from pydantic import BaseModel
from src.utils import consts
from src import consts
class LinkRequest(BaseModel):
cmd: str = "get"
url: str
max_timeout: int = Field(30, alias="maxTimeout")
class ProtectionTriggeredError(Exception):
pass
cmd: Annotated[
str,
Body(
default="request.get",
description="Type of request, currently only supports GET requests. This string is purely for compatibility with FlareSolverr.",
),
]
url: Annotated[str, Body(pattern=r"^https?://", default="https://")]
max_timeout: Annotated[int, Body(default=60)]
class Solution(BaseModel):
@ -48,8 +51,10 @@ class LinkResponse(BaseModel):
status: str = "ok"
message: str
solution: Solution
startTimestamp: int # noqa: N815 # Ignore to preserve compatibility
endTimestamp: int = int(time.time() * 1000) # noqa: N815 # Ignore to preserve compatibility
start_timestamp: Annotated[int, Body(alias="startTimestamp")] = int(
time.time() * 1000
)
end_timestamp: Annotated[int, Body(alias="endTimestamp")] = int(time.time() * 1000)
version: str = consts.VERSION
@classmethod
@ -63,10 +68,6 @@ class LinkResponse(BaseModel):
status="error",
message="Invalid request",
solution=Solution.invalid(url),
startTimestamp=int(time.time() * 1000),
endTimestamp=int(time.time() * 1000),
start_timestamp=int(time.time() * 1000),
end_timestamp=int(time.time() * 1000),
)
class NoChromeExtensionError(Exception):
"""No chrome extension found."""

View File

View File

@ -1,95 +0,0 @@
from __future__ import annotations
from pydantic import BaseModel
class Author(BaseModel):
login: str
id: int
node_id: str
avatar_url: str
gravatar_id: str
url: str
html_url: str
followers_url: str
following_url: str
gists_url: str
starred_url: str
subscriptions_url: str
organizations_url: str
repos_url: str
events_url: str
received_events_url: str
type: str
site_admin: bool
class Uploader(BaseModel):
login: str
id: int
node_id: str
avatar_url: str
gravatar_id: str
url: str
html_url: str
followers_url: str
following_url: str
gists_url: str
starred_url: str
subscriptions_url: str
organizations_url: str
repos_url: str
events_url: str
received_events_url: str
type: str
site_admin: bool
class Asset(BaseModel):
url: str
id: int
node_id: str
name: str
label: str | None
uploader: Uploader
content_type: str
state: str
size: int
download_count: int
created_at: str
updated_at: str
browser_download_url: str
class Reactions(BaseModel):
url: str
total_count: int
laugh: int
hooray: int
confused: int
heart: int
rocket: int
eyes: int
class GithubResponse(BaseModel):
url: str
assets_url: str
upload_url: str
html_url: str
id: int
author: Author
node_id: str
tag_name: str
target_commitish: str
name: str
draft: bool
prerelease: bool
created_at: str
published_at: str
assets: list[Asset]
tarball_url: str
zipball_url: str
body: str
reactions: Reactions

36
src/utils.py Normal file
View File

@ -0,0 +1,36 @@
import logging
from time import gmtime, strftime
from fastapi import Header
from sbase import SB, BaseCase
from src.consts import LOG_LEVEL, PROXY, USE_HEADLESS
logger = logging.getLogger("uvicorn.error")
logger.setLevel(LOG_LEVEL)
if len(logger.handlers) == 0:
logger.addHandler(logging.StreamHandler())
def get_sb(
proxy: str | None = Header(
default=None,
example="username:password@host:port",
description="Override default proxy from env",
),
):
"""Get SeleniumBase instance."""
with SB(
uc=True,
headless=USE_HEADLESS,
headed=not USE_HEADLESS,
locale_code="en",
ad_block=True,
proxy=proxy or PROXY,
) as sb:
yield sb
def save_screenshot(sb: BaseCase):
"""Save screenshot on HTTPException."""
sb.save_screenshot(f"screenshots/{strftime('%Y-%m-%d %H:%M:%S', gmtime())}.png")

View File

@ -1,8 +0,0 @@
import logging
from src.utils.consts import LOG_LEVEL
logger = logging.getLogger("uvicorn.error")
logger.setLevel(LOG_LEVEL)
if len(logger.handlers) == 0:
logger.addHandler(logging.StreamHandler())

View File

@ -1,3 +1,3 @@
#!/bin/sh
uv run pytest --retries 3 -n auto
uv run pytest --retries 3

View File

@ -5,7 +5,7 @@ import pytest
from starlette.testclient import TestClient
from main import app
from src.models.requests import LinkRequest
from src.models import LinkRequest
client = TestClient(app)
@ -29,16 +29,19 @@ def test_bypass(website: str):
website,
)
if (
test_request.status_code != HTTPStatus.OK
test_request.status_code == HTTPStatus.OK
and "Just a moment..." not in test_request.text
):
pytest.skip(f"Skipping {website} due to {test_request.status_code}")
response = client.post(
"/v1",
json=LinkRequest.model_construct(
json={
**LinkRequest.model_construct(
url=website, max_timeout=30, cmd="request.get"
).model_dump(),
"proxy": "203.174.15.83:8080",
},
)
assert response.status_code == HTTPStatus.OK