from pathlib import Path from typing import Optional from fastapi import FastAPI, status from fastapi.responses import JSONResponse from pydantic import BaseModel app = FastAPI() data_path = Path(__file__).parent.resolve() / "data" class Files(BaseModel): files: list[str] class Log(BaseModel): start: int size: int total: int content: str end: bool class SearchResult(BaseModel): matches: list[int] class Error(BaseModel): error: str def handle_path(name: str) -> tuple[Path, Optional[JSONResponse]]: path = data_path.joinpath(name) # Prevent path traversal if not path.is_relative_to(data_path): return path, JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"error": "Bad Request"} ) if not path.is_file(): return path, JSONResponse( status_code=status.HTTP_400_BAD_REQUEST, content={"error": "Log not found"} ) return path, None @app.get("/", response_model=Files) def list_logs(): return {"files": [path.name for path in data_path.glob("*.txt") if path.is_file()]} @app.get( "/log/{name}/", response_model=Log, responses={ 400: {"model": Error}, 404: {"model": Error}, }, ) def get_log(name: str, start: int = 0, size: int = 100): path, resp = handle_path(name) if resp: return resp with open(path) as f: f.seek(start) content = f.read(size) total = path.stat().st_size return { "start": start, "size": len(content), "total": total, "content": content, "end": size + start >= total, } @app.get( "/log/{name}/search/", response_model=SearchResult, responses={ 400: {"model": Error}, 404: {"model": Error}, }, ) def search_log(name: str, query: str, start: int = 0): path, resp = handle_path(name) if resp: return resp matches = [] buffer_size = max(len(query), 1024) break_outer = False with open(path) as f: f.seek(start) head = start tail = start + 2 * buffer_size buffer = f.read(tail) while True: offset = 0 while True: index = buffer.find(query, offset) if index != -1: matches.append(head + index) offset = index + 1 if len(matches) >= 100: break_outer = True break else: break if break_outer: break data = f.read(buffer_size) if not data: break buffer = buffer[-len(query):] + data head = tail - len(query) tail += len(data) return {"matches": matches}