- Use a line based search (20% faster) - Parse file as bytes to avoid offset by unicode characters
119 lines
2.6 KiB
Python
119 lines
2.6 KiB
Python
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
from fastapi import FastAPI, status
|
|
from fastapi.responses import JSONResponse
|
|
from pydantic import BaseModel
|
|
|
|
app = FastAPI()
|
|
data_path = Path(__file__).parent.resolve() / "data"
|
|
|
|
|
|
class Files(BaseModel):
|
|
files: list[str]
|
|
|
|
|
|
class Log(BaseModel):
|
|
start: int
|
|
size: int
|
|
total: int
|
|
content: str
|
|
end: bool
|
|
|
|
|
|
class SearchResult(BaseModel):
|
|
matches: list[int]
|
|
|
|
|
|
class Error(BaseModel):
|
|
error: str
|
|
|
|
|
|
def handle_path(name: str) -> tuple[Path, Optional[JSONResponse]]:
|
|
path = data_path.joinpath(name)
|
|
|
|
# Prevent path traversal
|
|
if not path.is_relative_to(data_path):
|
|
return path, JSONResponse(
|
|
status_code=status.HTTP_400_BAD_REQUEST, content={"error": "Bad Request"}
|
|
)
|
|
|
|
if not path.is_file():
|
|
return path, JSONResponse(
|
|
status_code=status.HTTP_400_BAD_REQUEST, content={"error": "Log not found"}
|
|
)
|
|
|
|
return path, None
|
|
|
|
|
|
@app.get("/", response_model=Files)
|
|
def list_logs():
|
|
return {"files": [path.name for path in data_path.glob("*.txt") if path.is_file()]}
|
|
|
|
|
|
@app.get(
|
|
"/log/{name}/",
|
|
response_model=Log,
|
|
responses={
|
|
400: {"model": Error},
|
|
404: {"model": Error},
|
|
},
|
|
)
|
|
def get_log(name: str, start: int = 0, size: int = 100):
|
|
path, resp = handle_path(name)
|
|
if resp:
|
|
return resp
|
|
|
|
with open(path) as f:
|
|
f.seek(start)
|
|
content = f.read(size)
|
|
|
|
total = path.stat().st_size
|
|
|
|
return {
|
|
"start": start,
|
|
"size": len(content),
|
|
"total": total,
|
|
"content": content,
|
|
"end": size + start >= total,
|
|
}
|
|
|
|
|
|
@app.get(
|
|
"/log/{name}/search/",
|
|
response_model=SearchResult,
|
|
responses={
|
|
400: {"model": Error},
|
|
404: {"model": Error},
|
|
},
|
|
)
|
|
def search_log(name: str, query: str, start: int = 0):
|
|
path, resp = handle_path(name)
|
|
if resp:
|
|
return resp
|
|
|
|
query_b = query.encode()
|
|
matches = []
|
|
break_outer = False
|
|
with open(path, "rb") as f:
|
|
f.seek(start)
|
|
count = start
|
|
|
|
while line := f.readline():
|
|
offset = 0
|
|
while True:
|
|
index = line.find(query_b, offset)
|
|
if index != -1:
|
|
matches.append(count + index)
|
|
offset = index + 1
|
|
if len(matches) >= 100:
|
|
break_outer = True
|
|
break
|
|
else:
|
|
break
|
|
if break_outer:
|
|
break
|
|
count += len(line)
|
|
|
|
return {"matches": matches}
|