source-dev/main.py
Ceda EI 4230489d4b Better search algorithm and unicode handling
- Use a line based search (20% faster)
- Parse file as bytes to avoid offset by unicode characters
2025-05-02 17:10:24 +05:30

119 lines
2.6 KiB
Python

from pathlib import Path
from typing import Optional
from fastapi import FastAPI, status
from fastapi.responses import JSONResponse
from pydantic import BaseModel
app = FastAPI()
data_path = Path(__file__).parent.resolve() / "data"
class Files(BaseModel):
files: list[str]
class Log(BaseModel):
start: int
size: int
total: int
content: str
end: bool
class SearchResult(BaseModel):
matches: list[int]
class Error(BaseModel):
error: str
def handle_path(name: str) -> tuple[Path, Optional[JSONResponse]]:
path = data_path.joinpath(name)
# Prevent path traversal
if not path.is_relative_to(data_path):
return path, JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST, content={"error": "Bad Request"}
)
if not path.is_file():
return path, JSONResponse(
status_code=status.HTTP_400_BAD_REQUEST, content={"error": "Log not found"}
)
return path, None
@app.get("/", response_model=Files)
def list_logs():
return {"files": [path.name for path in data_path.glob("*.txt") if path.is_file()]}
@app.get(
"/log/{name}/",
response_model=Log,
responses={
400: {"model": Error},
404: {"model": Error},
},
)
def get_log(name: str, start: int = 0, size: int = 100):
path, resp = handle_path(name)
if resp:
return resp
with open(path) as f:
f.seek(start)
content = f.read(size)
total = path.stat().st_size
return {
"start": start,
"size": len(content),
"total": total,
"content": content,
"end": size + start >= total,
}
@app.get(
"/log/{name}/search/",
response_model=SearchResult,
responses={
400: {"model": Error},
404: {"model": Error},
},
)
def search_log(name: str, query: str, start: int = 0):
path, resp = handle_path(name)
if resp:
return resp
query_b = query.encode()
matches = []
break_outer = False
with open(path, "rb") as f:
f.seek(start)
count = start
while line := f.readline():
offset = 0
while True:
index = line.find(query_b, offset)
if index != -1:
matches.append(count + index)
offset = index + 1
if len(matches) >= 100:
break_outer = True
break
else:
break
if break_outer:
break
count += len(line)
return {"matches": matches}