mirror of
https://github.com/ThePhaseless/Byparr.git
synced 2025-03-15 01:40:21 +08:00
add healthcheck
This commit is contained in:
parent
a35167e533
commit
73b3967618
@ -27,4 +27,5 @@ COPY pyproject.toml poetry.lock ./
|
||||
RUN poetry install
|
||||
|
||||
COPY . .
|
||||
HEALTHCHECK --interval=60s --timeout=30s --start-period=5s --retries=3 CMD [ "curl", "http://localhost:8191/health" ]
|
||||
CMD ["./cmd.sh"]
|
@ -1,65 +0,0 @@
|
||||
# https://github.com/ultrafunkamsterdam/undetected-chromedriver/issues/1954
|
||||
# Fix for nodriver in .venv/lib/python3.11/site-packages/nodriver/core/browser.py
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
from platform import python_version
|
||||
|
||||
env_path = os.getenv("VIRTUAL_ENV")
|
||||
if env_path is None:
|
||||
env_path = Path(os.__file__).parent.parent.parent.as_posix()
|
||||
python_version = python_version().split(".")[0:2]
|
||||
nodriver_path = Path(env_path + f"/lib/python{'.'.join(python_version)}/site-packages/nodriver/cdp/network.py")
|
||||
if not nodriver_path.exists():
|
||||
msg = f"{nodriver_path} not found"
|
||||
raise FileNotFoundError(msg)
|
||||
|
||||
new_cookie_partition_key = """\
|
||||
if isinstance(json, str):
|
||||
return cls(top_level_site=json, has_cross_site_ancestor=False)
|
||||
elif isinstance(json, dict):
|
||||
return cls(
|
||||
top_level_site=str(json["topLevelSite"]),
|
||||
has_cross_site_ancestor=bool(json["hasCrossSiteAncestor"]),
|
||||
)
|
||||
"""
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
handler = logging.StreamHandler()
|
||||
logger.addHandler(handler)
|
||||
logger.setLevel(logging.INFO)
|
||||
logger.info(f"Fixing nodriver in {nodriver_path}")
|
||||
# delete CookiePartitionKey declaration
|
||||
with nodriver_path.open("r+") as f:
|
||||
lines = f.readlines()
|
||||
found_def = False
|
||||
found_body = False
|
||||
i = -1
|
||||
while i < len(lines):
|
||||
i += 1
|
||||
line = lines[i]
|
||||
strip_line = line.strip("\n")
|
||||
if not found_def and line.startswith("class CookiePartitionKey:"):
|
||||
logger.info(f"Found line {i}: {strip_line}")
|
||||
found_def = True
|
||||
continue
|
||||
if found_def:
|
||||
if line.startswith(" def from_json"):
|
||||
logger.info(f"Found line {i}: {strip_line}")
|
||||
found_body = True
|
||||
continue
|
||||
if found_body:
|
||||
if line.startswith(("\t\t", " ")):
|
||||
logger.info(f"Removing line {i}: {strip_line}")
|
||||
lines.pop(i)
|
||||
i -= 1
|
||||
continue
|
||||
else:
|
||||
lines = lines[:i] + [new_cookie_partition_key] + lines[i:]
|
||||
break
|
||||
|
||||
|
||||
with nodriver_path.open("w") as f:
|
||||
f.writelines(lines)
|
76
main.py
76
main.py
@ -30,9 +30,16 @@ def read_root():
|
||||
async def health_check():
|
||||
"""Health check endpoint."""
|
||||
logger.info("Health check")
|
||||
# browser: Chrome = await new_browser()
|
||||
# browser.get("https://google.com")
|
||||
# browser.stop()
|
||||
|
||||
health_check_request = read_item(
|
||||
LinkRequest.model_construct(url="https://prowlarr.servarr.com/v1/ping")
|
||||
)
|
||||
if health_check_request.solution.status != 200:
|
||||
raise HTTPException(
|
||||
status_code=500,
|
||||
detail="Health check failed",
|
||||
)
|
||||
|
||||
return {"status": "ok"}
|
||||
|
||||
|
||||
@ -45,37 +52,44 @@ def read_item(request: LinkRequest):
|
||||
response: LinkResponse
|
||||
|
||||
# start_time = int(time.time() * 1000)
|
||||
with SB(uc=True, locale_code="en", test=False, xvfb=True, ad_block=True) as sb:
|
||||
sb: BaseCase
|
||||
sb.uc_open_with_reconnect(request.url)
|
||||
sb.uc_gui_click_captcha()
|
||||
logger.info(f"Got webpage: {request.url}")
|
||||
sb.save_screenshot("screenshot.png")
|
||||
logger.info(f"Got webpage: {request.url}")
|
||||
try:
|
||||
with SB(uc=True, locale_code="en", test=False, xvfb=True, ad_block=True) as sb:
|
||||
sb: BaseCase
|
||||
sb.uc_open_with_reconnect(request.url)
|
||||
source = sb.get_page_source()
|
||||
source_bs = BeautifulSoup(source, "html.parser")
|
||||
title_tag = source_bs.title
|
||||
logger.info(f"Got webpage: {request.url}")
|
||||
if title_tag in src.utils.consts.CHALLENGE_TITLES:
|
||||
logger.info("Challenge detected")
|
||||
sb.uc_gui_click_captcha()
|
||||
logger.info("Clicked captcha")
|
||||
sb.save_screenshot("screenshot.png")
|
||||
|
||||
source = sb.get_page_source()
|
||||
source_bs = BeautifulSoup(source, "html.parser")
|
||||
title_tag = source_bs.title
|
||||
if title_tag is None:
|
||||
raise HTTPException(status_code=500, detail="Title tag not found")
|
||||
source = sb.get_page_source()
|
||||
source_bs = BeautifulSoup(source, "html.parser")
|
||||
title_tag = source_bs.title
|
||||
|
||||
if title_tag.string in src.utils.consts.CHALLENGE_TITLES:
|
||||
raise HTTPException(status_code=500, detail="Could not bypass challenge")
|
||||
if title_tag and title_tag.string in src.utils.consts.CHALLENGE_TITLES:
|
||||
raise HTTPException(
|
||||
status_code=500, detail="Could not bypass challenge"
|
||||
)
|
||||
|
||||
title = title_tag.string
|
||||
logger.info(f"Title: {title}")
|
||||
response = LinkResponse(
|
||||
message="Success",
|
||||
solution=Solution(
|
||||
userAgent=sb.get_user_agent(),
|
||||
url=sb.get_current_url(),
|
||||
status=200,
|
||||
cookies=sb.get_cookies(),
|
||||
headers={},
|
||||
response=source,
|
||||
),
|
||||
startTimestamp=start_time,
|
||||
)
|
||||
response = LinkResponse(
|
||||
message="Success",
|
||||
solution=Solution(
|
||||
userAgent=sb.get_user_agent(),
|
||||
url=sb.get_current_url(),
|
||||
status=200,
|
||||
cookies=sb.get_cookies(),
|
||||
headers={},
|
||||
response=source,
|
||||
),
|
||||
startTimestamp=start_time,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error: {e}")
|
||||
raise HTTPException(status_code=500, detail="Unknown error, check logs") from e
|
||||
|
||||
return response
|
||||
|
||||
|
@ -4,13 +4,13 @@ import time
|
||||
from http import HTTPStatus
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class LinkRequest(BaseModel):
|
||||
cmd: str
|
||||
cmd: str = "get"
|
||||
url: str
|
||||
maxTimeout: int # noqa: N815 # Ignore to preserve compatibility
|
||||
max_timeout: int = Field(30, alias="maxTimeout")
|
||||
|
||||
|
||||
class ProtectionTriggeredError(Exception):
|
||||
|
@ -39,7 +39,9 @@ def test_bypass(website: str):
|
||||
|
||||
response = client.post(
|
||||
"/v1",
|
||||
json=LinkRequest(url=website, maxTimeout=30, cmd="request.get").model_dump(),
|
||||
json=LinkRequest.model_construct(
|
||||
url=website, max_timeout=30, cmd="request.get"
|
||||
).model_dump(),
|
||||
)
|
||||
|
||||
assert response.status_code == HTTPStatus.OK
|
||||
|
Loading…
x
Reference in New Issue
Block a user