576 lines
19 KiB
Python
576 lines
19 KiB
Python
from fastapi import FastAPI, File, HTTPException, UploadFile
|
|
from pydantic import BaseModel, Field
|
|
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
|
|
from cachetools import TTLCache
|
|
from PIL import Image
|
|
from pypdf import PdfReader
|
|
from docx import Document
|
|
import fitz
|
|
import hashlib
|
|
import io
|
|
import json
|
|
import os
|
|
import re
|
|
import torch
|
|
import pytesseract
|
|
from urllib import request as urllib_request
|
|
from urllib.error import URLError, HTTPError
|
|
|
|
app = FastAPI(title="Local AI Service")
|
|
|
|
MODEL_NAME = "sshleifer/distilbart-cnn-12-6"
|
|
MAX_INPUT_CHARS = 20000
|
|
MAX_CONTEXT_CHARS = 2200
|
|
MAX_EXTRACT_FILE_BYTES = 8 * 1024 * 1024
|
|
OCR_LANGUAGES = "eng"
|
|
IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".webp"}
|
|
OLLAMA_BASE_URL = os.getenv("OLLAMA_BASE_URL", "http://127.0.0.1:11434").rstrip("/")
|
|
OLLAMA_MODEL = os.getenv("OLLAMA_MODEL", "")
|
|
|
|
|
|
def _load_runtime():
|
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
|
|
model = AutoModelForSeq2SeqLM.from_pretrained(MODEL_NAME)
|
|
model.eval()
|
|
has_cuda = torch.cuda.is_available()
|
|
device = torch.device("cuda" if has_cuda else "cpu")
|
|
model.to(device)
|
|
gpu_name = torch.cuda.get_device_name(0) if has_cuda else None
|
|
return tokenizer, model, device, has_cuda, gpu_name
|
|
|
|
|
|
tokenizer, model, device, GPU_AVAILABLE, GPU_NAME = _load_runtime()
|
|
cache = TTLCache(maxsize=1024, ttl=60 * 60)
|
|
|
|
|
|
class SummarizeRequest(BaseModel):
|
|
text: str = Field(min_length=1, max_length=MAX_INPUT_CHARS)
|
|
max_length: int = Field(default=160, ge=24, le=256)
|
|
min_length: int = Field(default=45, ge=8, le=180)
|
|
top_skills: int = Field(default=8, ge=3, le=12)
|
|
|
|
|
|
class CvClassifyBlockRequest(BaseModel):
|
|
block: str = Field(min_length=1, max_length=6000)
|
|
|
|
|
|
def _key(text: str, max_length: int, min_length: int, top_skills: int) -> str:
|
|
h = hashlib.sha256(text.encode("utf-8")).hexdigest()
|
|
return f"{h}:{max_length}:{min_length}:{top_skills}"
|
|
|
|
|
|
def _ollama_status():
|
|
configured = bool(OLLAMA_MODEL)
|
|
if not configured:
|
|
return {
|
|
"ollama_configured": False,
|
|
"ollama_reachable": False,
|
|
"ollama_model": None,
|
|
"ollama_model_available": False,
|
|
}
|
|
|
|
req = urllib_request.Request(f"{OLLAMA_BASE_URL}/api/tags", method="GET")
|
|
try:
|
|
with urllib_request.urlopen(req, timeout=5) as response:
|
|
body = json.loads(response.read().decode("utf-8"))
|
|
except Exception:
|
|
return {
|
|
"ollama_configured": True,
|
|
"ollama_reachable": False,
|
|
"ollama_model": OLLAMA_MODEL,
|
|
"ollama_model_available": False,
|
|
}
|
|
|
|
models = body.get("models") or []
|
|
names = {item.get("name") for item in models if isinstance(item, dict)}
|
|
return {
|
|
"ollama_configured": True,
|
|
"ollama_reachable": True,
|
|
"ollama_model": OLLAMA_MODEL,
|
|
"ollama_model_available": OLLAMA_MODEL in names,
|
|
}
|
|
|
|
|
|
@app.get("/health")
|
|
async def health():
|
|
return {
|
|
"ok": True,
|
|
"model": MODEL_NAME,
|
|
"device": str(device),
|
|
"gpu_available": GPU_AVAILABLE,
|
|
"gpu_name": GPU_NAME,
|
|
"ocr_available": True,
|
|
"ocr_languages": OCR_LANGUAGES,
|
|
**_ollama_status(),
|
|
}
|
|
|
|
|
|
_TECH = [
|
|
"python", "c#", "dotnet", ".net", "java", "javascript", "typescript", "react", "node", "sql",
|
|
"postgres", "postgresql", "mysql", "sqlite", "mongodb", "redis", "aws", "azure", "gcp",
|
|
"docker", "kubernetes", "terraform", "linux", "git", "ci/cd", "graphql", "rest",
|
|
]
|
|
|
|
_SOFT = [
|
|
"communication", "collaboration", "teamwork", "problem solving", "leadership", "mentoring",
|
|
"ownership", "initiative", "adaptability", "stakeholder management", "detail oriented",
|
|
]
|
|
|
|
_TECH_PRIORITY = [
|
|
"python", "c#", ".net", "dotnet", "typescript", "javascript", "react", "node",
|
|
"sql", "postgresql", "postgres", "mysql", "sqlite", "docker", "kubernetes",
|
|
"aws", "azure", "gcp", "terraform", "graphql", "rest", "git",
|
|
]
|
|
|
|
_MUST_HAVE_HINTS = [
|
|
"must have", "required", "requirements", "you have", "you bring", "essential", "we are looking for",
|
|
]
|
|
_NICE_TO_HAVE_HINTS = [
|
|
"nice to have", "bonus", "preferred", "advantageous", "extra plus",
|
|
]
|
|
_SCREENING_HINTS = [
|
|
"experience with", "hands-on", "demonstrated", "proven", "track record", "delivered",
|
|
]
|
|
|
|
|
|
def _rank_tech_skills(skills):
|
|
ordered = []
|
|
seen = set()
|
|
for preferred in _TECH_PRIORITY:
|
|
for skill in skills:
|
|
if skill == preferred and skill not in seen:
|
|
ordered.append(skill)
|
|
seen.add(skill)
|
|
for skill in skills:
|
|
if skill not in seen:
|
|
ordered.append(skill)
|
|
seen.add(skill)
|
|
return ordered
|
|
|
|
|
|
def _strip_html(text: str) -> str:
|
|
text = re.sub(r"<\s*br\s*/?>", "\n", text, flags=re.IGNORECASE)
|
|
text = re.sub(r"</p\s*>", "\n", text, flags=re.IGNORECASE)
|
|
text = re.sub(r"<[^>]+>", " ", text)
|
|
return re.sub(r"\n{3,}", "\n\n", text).strip()
|
|
|
|
|
|
def _extract_bullets(lines, max_items=8):
|
|
out = []
|
|
for ln in lines:
|
|
s = ln.strip()
|
|
if not s:
|
|
continue
|
|
if re.match(r"^([-*]|\u2022)\s+", s):
|
|
s = re.sub(r"^([-*]|\u2022)\s+", "", s).strip()
|
|
if 3 <= len(s) <= 220:
|
|
out.append(s)
|
|
if len(out) >= max_items:
|
|
break
|
|
return out
|
|
|
|
|
|
def _top_keywords(text: str, limit=6):
|
|
words = re.findall(r"[a-zA-Z][a-zA-Z+#./-]{2,}", text.lower())
|
|
stop = {
|
|
"with", "from", "that", "this", "will", "have", "your", "their", "about", "role", "team", "work",
|
|
"experience", "skills", "requirements", "responsibilities", "company", "using", "ability", "years",
|
|
"looking", "candidate", "position", "working", "across", "strong", "building", "support",
|
|
}
|
|
counts = {}
|
|
for word in words:
|
|
if word in stop or word in _TECH or word in _SOFT:
|
|
continue
|
|
counts[word] = counts.get(word, 0) + 1
|
|
ordered = sorted(counts.items(), key=lambda item: (-item[1], item[0]))
|
|
return [word for word, _ in ordered[:limit]]
|
|
|
|
|
|
def _first_matching_sentences(text: str, hints, limit=3):
|
|
sentences = re.split(r"(?<=[.!?])\s+", text)
|
|
found = []
|
|
for sentence in sentences:
|
|
low = sentence.lower()
|
|
if any(hint in low for hint in hints):
|
|
cleaned = sentence.strip()
|
|
if 20 <= len(cleaned) <= 220:
|
|
found.append(cleaned)
|
|
if len(found) >= limit:
|
|
break
|
|
return found
|
|
|
|
|
|
def _trim_line(text: str, max_len: int = 140) -> str:
|
|
text = re.sub(r"\s+", " ", text).strip(" -•\t")
|
|
if len(text) <= max_len:
|
|
return text
|
|
return text[: max_len - 1].rstrip() + "…"
|
|
|
|
|
|
def _role_focused_excerpt(text: str) -> dict:
|
|
cleaned = _strip_html(text)
|
|
lines = [ln.strip() for ln in cleaned.splitlines()]
|
|
|
|
headings = {
|
|
"responsibilities": ["responsibilities", "what you will do", "what you'll do", "the role", "your role", "you will"],
|
|
"requirements": ["requirements", "what we are looking for", "what we're looking for", "skills", "experience", "must have"],
|
|
"nice": ["nice to have", "bonus", "preferred"],
|
|
}
|
|
|
|
def match_heading(s: str):
|
|
sl = s.lower().strip(":-\x7f ")
|
|
for key, words in headings.items():
|
|
for word in words:
|
|
if sl == word or sl.startswith(word + " "):
|
|
return key
|
|
return None
|
|
|
|
section = None
|
|
resp_lines = []
|
|
req_lines = []
|
|
nice_lines = []
|
|
|
|
for ln in lines:
|
|
if not ln:
|
|
continue
|
|
heading = match_heading(ln)
|
|
if heading:
|
|
section = heading
|
|
continue
|
|
if section == "responsibilities":
|
|
resp_lines.append(ln)
|
|
elif section == "requirements":
|
|
req_lines.append(ln)
|
|
elif section == "nice":
|
|
nice_lines.append(ln)
|
|
|
|
responsibilities = _extract_bullets(resp_lines, max_items=7)
|
|
requirements = _extract_bullets(req_lines, max_items=7)
|
|
nice = _extract_bullets(nice_lines, max_items=5)
|
|
|
|
tech_found = []
|
|
soft_found = []
|
|
low = cleaned.lower()
|
|
for t in _TECH:
|
|
if t in low:
|
|
tech_found.append(t)
|
|
for s in _SOFT:
|
|
if s in low:
|
|
soft_found.append(s)
|
|
|
|
if not responsibilities and not requirements:
|
|
any_bullets = _extract_bullets(lines, max_items=10)
|
|
responsibilities = any_bullets[:6]
|
|
requirements = any_bullets[6:10]
|
|
|
|
if not requirements:
|
|
requirements = [_trim_line(x) for x in _first_matching_sentences(cleaned, _MUST_HAVE_HINTS, limit=4)]
|
|
if not nice:
|
|
nice = [_trim_line(x) for x in _first_matching_sentences(cleaned, _NICE_TO_HAVE_HINTS, limit=3)]
|
|
|
|
focused_parts = []
|
|
if responsibilities:
|
|
focused_parts.append("Responsibilities:\n- " + "\n- ".join(responsibilities))
|
|
if requirements:
|
|
focused_parts.append("Requirements:\n- " + "\n- ".join(requirements))
|
|
if nice:
|
|
focused_parts.append("Nice to have:\n- " + "\n- ".join(nice))
|
|
focused_parts.append("Context:\n" + cleaned[:MAX_CONTEXT_CHARS])
|
|
|
|
screen_focus = []
|
|
for item in requirements[:4]:
|
|
if any(hint in item.lower() for hint in _SCREENING_HINTS) or len(screen_focus) < 2:
|
|
screen_focus.append(_trim_line(item))
|
|
if not screen_focus:
|
|
screen_focus = [_trim_line(x) for x in _first_matching_sentences(cleaned, _SCREENING_HINTS, limit=3)]
|
|
|
|
return {
|
|
"cleaned": cleaned,
|
|
"focused_input": "\n\n".join(focused_parts),
|
|
"responsibilities": responsibilities,
|
|
"requirements": requirements,
|
|
"nice": nice,
|
|
"tech": tech_found,
|
|
"soft": soft_found,
|
|
"keywords": _top_keywords(cleaned),
|
|
"screen_focus": screen_focus[:3],
|
|
}
|
|
|
|
|
|
def _model_summarize(text: str, max_length: int, min_length: int) -> str:
|
|
inputs = tokenizer(text, return_tensors="pt", truncation=True, max_length=1024)
|
|
input_ids = inputs.input_ids.to(device)
|
|
attention_mask = inputs.attention_mask.to(device) if hasattr(inputs, "attention_mask") else None
|
|
with torch.no_grad():
|
|
outputs = model.generate(
|
|
input_ids,
|
|
attention_mask=attention_mask,
|
|
max_length=max_length,
|
|
min_length=min_length,
|
|
num_beams=3,
|
|
length_penalty=1.0,
|
|
no_repeat_ngram_size=3,
|
|
early_stopping=True,
|
|
)
|
|
return tokenizer.decode(outputs[0], skip_special_tokens=True).strip()
|
|
|
|
|
|
def _ollama_generate_json(prompt: str):
|
|
if not OLLAMA_MODEL:
|
|
raise HTTPException(status_code=503, detail="OLLAMA_MODEL is not configured.")
|
|
|
|
payload = json.dumps({
|
|
"model": OLLAMA_MODEL,
|
|
"prompt": prompt,
|
|
"stream": False,
|
|
"format": "json",
|
|
"options": {"temperature": 0.1}
|
|
}).encode("utf-8")
|
|
|
|
req = urllib_request.Request(
|
|
f"{OLLAMA_BASE_URL}/api/generate",
|
|
data=payload,
|
|
headers={"Content-Type": "application/json"},
|
|
method="POST",
|
|
)
|
|
|
|
try:
|
|
with urllib_request.urlopen(req, timeout=30) as response:
|
|
body = json.loads(response.read().decode("utf-8"))
|
|
except HTTPError as ex:
|
|
raise HTTPException(status_code=502, detail=f"Ollama request failed with {ex.code}.")
|
|
except URLError as ex:
|
|
raise HTTPException(status_code=503, detail=f"Ollama is unreachable: {ex.reason}.")
|
|
|
|
raw = (body.get("response") or "").strip()
|
|
if not raw:
|
|
raise HTTPException(status_code=502, detail="Ollama returned an empty response.")
|
|
|
|
try:
|
|
return json.loads(raw)
|
|
except json.JSONDecodeError:
|
|
start = raw.find("{")
|
|
end = raw.rfind("}")
|
|
if start >= 0 and end > start:
|
|
return json.loads(raw[start:end + 1])
|
|
raise HTTPException(status_code=502, detail="Ollama did not return valid JSON.")
|
|
|
|
|
|
@app.post("/cv/classify-block")
|
|
async def classify_cv_block(req: CvClassifyBlockRequest):
|
|
prompt = f"""
|
|
You classify one CV text block into structured JSON.
|
|
Return ONLY valid JSON with this exact shape:
|
|
{{
|
|
"section": "Contact|Professional Summary|Work Experience|Education|Skills|Languages|Interests|Other",
|
|
"confidence": 0.0,
|
|
"reason": "short reason",
|
|
"title": string|null,
|
|
"company": string|null,
|
|
"location": string|null,
|
|
"start": string|null,
|
|
"end": string|null,
|
|
"bullets": string[]
|
|
}}
|
|
|
|
Rules:
|
|
- Preserve facts only.
|
|
- section must be one of the listed values.
|
|
- Use Work Experience only for job/employment blocks.
|
|
- For Contact blocks, keep title/company/start/end null and bullets empty.
|
|
- For non-work blocks, title/company/start/end should usually be null.
|
|
- location must look like a place, not a sentence.
|
|
- dates must be one of: year, month+year, dd/mm/yyyy, Present, Current.
|
|
- bullets should only be job tasks/achievements, not titles, companies, dates, or headings.
|
|
- If unsure, choose Other and keep fields null/empty.
|
|
|
|
Block:
|
|
{req.block.strip()}
|
|
""".strip()
|
|
|
|
parsed = _ollama_generate_json(prompt)
|
|
return {
|
|
"section": parsed.get("section") or "Other",
|
|
"confidence": parsed.get("confidence"),
|
|
"reason": parsed.get("reason"),
|
|
"title": parsed.get("title"),
|
|
"company": parsed.get("company"),
|
|
"location": parsed.get("location"),
|
|
"start": parsed.get("start"),
|
|
"end": parsed.get("end"),
|
|
"bullets": parsed.get("bullets") or [],
|
|
}
|
|
|
|
|
|
@app.post("/summarize")
|
|
async def summarize(req: SummarizeRequest):
|
|
if req.min_length >= req.max_length:
|
|
raise HTTPException(status_code=400, detail="min_length must be smaller than max_length.")
|
|
|
|
key = _key(req.text, req.max_length, req.min_length, req.top_skills)
|
|
if key in cache:
|
|
return {"summary": cache[key], "cached": True}
|
|
|
|
info = _role_focused_excerpt(req.text)
|
|
summary = _model_summarize(info["focused_input"], req.max_length, req.min_length)
|
|
|
|
ranked_tech = []
|
|
for t in _rank_tech_skills(info["tech"]):
|
|
if t not in ranked_tech:
|
|
ranked_tech.append(t)
|
|
|
|
uniq_soft = []
|
|
for s in info["soft"]:
|
|
if s not in uniq_soft:
|
|
uniq_soft.append(s)
|
|
|
|
lines = ["Role summary:", summary]
|
|
|
|
if info["requirements"]:
|
|
lines.append("")
|
|
lines.append("What the company wants most:")
|
|
for x in info["requirements"][:5]:
|
|
lines.append(f"- {_trim_line(x)}")
|
|
|
|
if ranked_tech:
|
|
lines.append("")
|
|
lines.append("Top hard skills:")
|
|
for skill in ranked_tech[: req.top_skills]:
|
|
lines.append(f"- {skill}")
|
|
|
|
if info["keywords"]:
|
|
lines.append("")
|
|
lines.append("Keywords to mirror:")
|
|
for keyword in info["keywords"][:5]:
|
|
lines.append(f"- {keyword}")
|
|
|
|
if info["responsibilities"]:
|
|
lines.append("")
|
|
lines.append("What you would be doing:")
|
|
for x in info["responsibilities"][:4]:
|
|
lines.append(f"- {_trim_line(x)}")
|
|
|
|
if info["nice"]:
|
|
lines.append("")
|
|
lines.append("Nice to have:")
|
|
for x in info["nice"][:3]:
|
|
lines.append(f"- {_trim_line(x)}")
|
|
|
|
if uniq_soft:
|
|
lines.append("")
|
|
lines.append("Relevant soft skills:")
|
|
for soft in uniq_soft[:5]:
|
|
lines.append(f"- {soft}")
|
|
|
|
lines.append("")
|
|
lines.append("Interview focus:")
|
|
if info["screen_focus"]:
|
|
for x in info["screen_focus"]:
|
|
lines.append(f"- Be ready to prove: {_trim_line(x)}")
|
|
elif info["requirements"]:
|
|
for x in info["requirements"][:3]:
|
|
lines.append(f"- Prepare examples that demonstrate: {_trim_line(x)}")
|
|
elif ranked_tech:
|
|
for x in ranked_tech[:3]:
|
|
lines.append(f"- Be ready to explain your hands-on experience with {x}")
|
|
else:
|
|
lines.append("- Prepare examples showing relevant impact, collaboration, and delivery.")
|
|
|
|
out = "\n".join(lines).strip()
|
|
cache[key] = out
|
|
return {"summary": out, "cached": False}
|
|
|
|
|
|
def _normalize_text(value: str) -> str:
|
|
value = value.replace("\x00", " ")
|
|
return re.sub(r"\s+", " ", value).strip()
|
|
|
|
|
|
def _ocr_image(image: Image.Image) -> str:
|
|
if image.mode not in ("RGB", "L"):
|
|
image = image.convert("RGB")
|
|
text = pytesseract.image_to_string(image, lang=OCR_LANGUAGES)
|
|
return _normalize_text(text)
|
|
|
|
|
|
def _extract_pdf_text(data: bytes) -> tuple[str, bool, int]:
|
|
page_count = 0
|
|
extracted_pages = []
|
|
try:
|
|
reader = PdfReader(io.BytesIO(data))
|
|
page_count = len(reader.pages)
|
|
for page in reader.pages:
|
|
extracted_pages.append(page.extract_text() or "")
|
|
except Exception:
|
|
extracted_pages = []
|
|
|
|
text = _normalize_text("\n".join(extracted_pages))
|
|
if len(text) >= 80:
|
|
return text, False, page_count
|
|
|
|
doc = fitz.open(stream=data, filetype="pdf")
|
|
page_count = doc.page_count
|
|
ocr_pages = []
|
|
for page in doc:
|
|
pix = page.get_pixmap(matrix=fitz.Matrix(2, 2), alpha=False)
|
|
image = Image.open(io.BytesIO(pix.tobytes("png")))
|
|
ocr_pages.append(_ocr_image(image))
|
|
doc.close()
|
|
return _normalize_text("\n".join(ocr_pages)), True, page_count
|
|
|
|
|
|
def _extract_docx_text(data: bytes) -> str:
|
|
document = Document(io.BytesIO(data))
|
|
parts = [p.text.strip() for p in document.paragraphs if p.text and p.text.strip()]
|
|
return _normalize_text("\n".join(parts))
|
|
|
|
|
|
def _extract_plain_text(data: bytes) -> str:
|
|
return _normalize_text(data.decode("utf-8", errors="ignore"))
|
|
|
|
|
|
@app.post("/extract-text")
|
|
async def extract_text(file: UploadFile = File(...)):
|
|
filename = file.filename or "document"
|
|
extension = "." + filename.rsplit(".", 1)[1].lower() if "." in filename else ""
|
|
data = await file.read()
|
|
if not data:
|
|
raise HTTPException(status_code=400, detail="The uploaded file was empty.")
|
|
if len(data) > MAX_EXTRACT_FILE_BYTES:
|
|
raise HTTPException(status_code=400, detail="The uploaded file is too large for AI extraction.")
|
|
|
|
try:
|
|
if extension in {".txt", ".md"}:
|
|
text = _extract_plain_text(data)
|
|
ocr_used = False
|
|
page_count = None
|
|
elif extension == ".docx":
|
|
text = _extract_docx_text(data)
|
|
ocr_used = False
|
|
page_count = None
|
|
elif extension == ".pdf":
|
|
text, ocr_used, page_count = _extract_pdf_text(data)
|
|
elif extension in IMAGE_EXTENSIONS:
|
|
image = Image.open(io.BytesIO(data))
|
|
text = _ocr_image(image)
|
|
ocr_used = True
|
|
page_count = 1
|
|
else:
|
|
raise HTTPException(status_code=400, detail="This file type is not supported for AI extraction.")
|
|
except HTTPException:
|
|
raise
|
|
except Exception as exc:
|
|
raise HTTPException(status_code=500, detail=f"AI extraction failed: {exc}") from exc
|
|
|
|
if not text:
|
|
raise HTTPException(status_code=422, detail="AI extraction did not find readable text in the uploaded file.")
|
|
|
|
return {
|
|
"text": text,
|
|
"ocr_used": ocr_used,
|
|
"content_type": file.content_type,
|
|
"page_count": page_count,
|
|
"characters": len(text),
|
|
"file_name": filename,
|
|
}
|