226 lines
8.8 KiB
Python
226 lines
8.8 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, Iterable, List, Optional, Protocol, Sequence
|
|
|
|
from .ingest import deduplicate_documents
|
|
from .models import IndexDocument
|
|
from .redmine import RedmineMapper
|
|
|
|
|
|
class RedmineRefreshSource(Protocol):
|
|
project_identifier: str | None
|
|
|
|
def recent_helpdesk_issues(self, limit: int) -> Iterable[Dict[str, Any]]:
|
|
...
|
|
|
|
|
|
class RefreshEmbedder(Protocol):
|
|
def embed_documents(self, docs: Sequence[IndexDocument]) -> List[List[float]]:
|
|
...
|
|
|
|
|
|
class RefreshStore(Protocol):
|
|
def list_documents(
|
|
self,
|
|
limit: int = 10,
|
|
source: Optional[str] = None,
|
|
project_identifier: Optional[str] = None,
|
|
doc_type: Optional[str] = None,
|
|
issue_id: Optional[int] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
...
|
|
|
|
def upsert(self, docs: Sequence[IndexDocument], vectors: Sequence[Sequence[float]]) -> None:
|
|
...
|
|
|
|
def delete_documents(self, document_ids: Sequence[str]) -> None:
|
|
...
|
|
|
|
|
|
class FileRefreshState:
|
|
def __init__(self, path: Path) -> None:
|
|
self.path = path
|
|
|
|
def load(self) -> Dict[str, Any]:
|
|
if not self.path.exists():
|
|
return {}
|
|
return json.loads(self.path.read_text(encoding="utf-8"))
|
|
|
|
def mark_success(self, project_identifier: str, timestamp: Optional[str] = None) -> None:
|
|
payload = self.load()
|
|
payload.setdefault("projects", {})
|
|
payload["projects"][project_identifier] = {
|
|
"last_successful_refresh_at": timestamp or datetime.now(timezone.utc).isoformat()
|
|
}
|
|
self.path.parent.mkdir(parents=True, exist_ok=True)
|
|
self.path.write_text(json.dumps(payload, indent=2, sort_keys=True) + "\n", encoding="utf-8")
|
|
|
|
|
|
class RedmineRefreshService:
|
|
def __init__(
|
|
self,
|
|
source: RedmineRefreshSource,
|
|
embedder: RefreshEmbedder,
|
|
store: RefreshStore,
|
|
mapper: Optional[RedmineMapper] = None,
|
|
state: Optional[FileRefreshState] = None,
|
|
) -> None:
|
|
self.source = source
|
|
self.embedder = embedder
|
|
self.store = store
|
|
self.mapper = mapper or RedmineMapper(redmine_url="")
|
|
self.state = state
|
|
|
|
def refresh_redmine_project_limits(
|
|
self,
|
|
project_limits: Dict[str, int],
|
|
dry_run: bool = False,
|
|
force_rebuild: bool = False,
|
|
overlap_minutes: int = 15,
|
|
) -> Dict[str, Any]:
|
|
previous_source_project = getattr(self.source, "project_identifier", None)
|
|
previous_mapper_project = getattr(self.mapper, "project_identifier", None)
|
|
project_results: List[Dict[str, Any]] = []
|
|
totals = {
|
|
"issues": 0,
|
|
"scanned_issues": 0,
|
|
"detail_fetched_issues": 0,
|
|
"skipped_issues": 0,
|
|
"documents": 0,
|
|
"unchanged_documents": 0,
|
|
"changed_documents": 0,
|
|
"new_documents": 0,
|
|
"stale_documents": 0,
|
|
"force_rebuilt_documents": 0,
|
|
"would_embed_documents": 0,
|
|
"embedded_documents": 0,
|
|
}
|
|
try:
|
|
for project, limit in project_limits.items():
|
|
if hasattr(self.source, "project_identifier"):
|
|
self.source.project_identifier = project
|
|
if hasattr(self.mapper, "project_identifier"):
|
|
self.mapper.project_identifier = project
|
|
project_result = self._refresh_project(project, limit, dry_run, force_rebuild, overlap_minutes)
|
|
project_results.append(project_result)
|
|
for key in totals:
|
|
totals[key] += int(project_result.get(key, 0))
|
|
if not dry_run and self.state is not None:
|
|
self.state.mark_success(project)
|
|
finally:
|
|
if hasattr(self.source, "project_identifier"):
|
|
self.source.project_identifier = previous_source_project
|
|
if hasattr(self.mapper, "project_identifier"):
|
|
self.mapper.project_identifier = previous_mapper_project
|
|
return {
|
|
"source": "redmine",
|
|
"projects": len(project_limits),
|
|
"dry_run": dry_run,
|
|
"force_rebuild": force_rebuild,
|
|
"overlap_minutes": overlap_minutes,
|
|
**totals,
|
|
"project_results": project_results,
|
|
}
|
|
|
|
def _refresh_project(self, project: str, limit: int, dry_run: bool, force_rebuild: bool, overlap_minutes: int) -> Dict[str, Any]:
|
|
summaries = list(self._recent_issue_summaries(limit))
|
|
result: Dict[str, Any] = {
|
|
"project_identifier": project,
|
|
"issues": len(summaries),
|
|
"scanned_issues": len(summaries),
|
|
"detail_fetched_issues": 0,
|
|
"skipped_issues": 0,
|
|
"documents": 0,
|
|
"unchanged_documents": 0,
|
|
"changed_documents": 0,
|
|
"new_documents": 0,
|
|
"stale_documents": 0,
|
|
"force_rebuilt_documents": 0,
|
|
"would_embed_documents": 0,
|
|
"embedded_documents": 0,
|
|
}
|
|
cutoff = self._cutoff_for_project(project, overlap_minutes)
|
|
docs_to_embed: List[IndexDocument] = []
|
|
stale_ids: List[str] = []
|
|
for summary in summaries:
|
|
if cutoff is not None and not force_rebuild and not self._issue_is_in_refresh_window(summary, cutoff):
|
|
result["skipped_issues"] += 1
|
|
continue
|
|
issue = self._issue_detail(summary)
|
|
result["detail_fetched_issues"] += 1
|
|
candidates = deduplicate_documents(self.mapper.issue_to_documents(issue))
|
|
result["documents"] += len(candidates)
|
|
existing = self.store.list_documents(
|
|
limit=5000,
|
|
source="redmine",
|
|
project_identifier=project,
|
|
issue_id=int(issue["id"]),
|
|
)
|
|
existing_by_id = {document["id"]: document for document in existing}
|
|
candidate_by_id = {document.id: document for document in candidates}
|
|
for stale_id in sorted(set(existing_by_id) - set(candidate_by_id)):
|
|
stale_ids.append(stale_id)
|
|
result["stale_documents"] += 1
|
|
for document in candidates:
|
|
existing_document = existing_by_id.get(document.id)
|
|
if existing_document is None:
|
|
result["new_documents"] += 1
|
|
docs_to_embed.append(document)
|
|
continue
|
|
existing_hash = (existing_document.get("payload") or {}).get("source_hash")
|
|
document_hash = document.payload.get("source_hash")
|
|
if force_rebuild:
|
|
result["force_rebuilt_documents"] += 1
|
|
docs_to_embed.append(document)
|
|
elif existing_hash != document_hash:
|
|
result["changed_documents"] += 1
|
|
docs_to_embed.append(document)
|
|
else:
|
|
result["unchanged_documents"] += 1
|
|
result["would_embed_documents"] = len(docs_to_embed)
|
|
if dry_run:
|
|
return result
|
|
if stale_ids:
|
|
self.store.delete_documents(stale_ids)
|
|
if docs_to_embed:
|
|
vectors = self.embedder.embed_documents(docs_to_embed)
|
|
self.store.upsert(docs_to_embed, vectors)
|
|
result["embedded_documents"] = len(docs_to_embed)
|
|
return result
|
|
|
|
def _recent_issue_summaries(self, limit: int) -> Iterable[Dict[str, Any]]:
|
|
if hasattr(self.source, "recent_issue_summaries"):
|
|
return self.source.recent_issue_summaries(limit) # type: ignore[attr-defined]
|
|
return self.source.recent_helpdesk_issues(limit)
|
|
|
|
def _issue_detail(self, summary: Dict[str, Any]) -> Dict[str, Any]:
|
|
if hasattr(self.source, "issue_detail"):
|
|
return self.source.issue_detail(int(summary["id"])) # type: ignore[attr-defined]
|
|
return summary
|
|
|
|
def _cutoff_for_project(self, project: str, overlap_minutes: int) -> Optional[datetime]:
|
|
if self.state is None:
|
|
return None
|
|
timestamp = ((self.state.load().get("projects") or {}).get(project) or {}).get("last_successful_refresh_at")
|
|
if not timestamp:
|
|
return None
|
|
parsed = parse_redmine_datetime(timestamp)
|
|
return parsed - timedelta(minutes=overlap_minutes)
|
|
|
|
def _issue_is_in_refresh_window(self, issue: Dict[str, Any], cutoff: datetime) -> bool:
|
|
updated_on = issue.get("updated_on")
|
|
if not updated_on:
|
|
return True
|
|
return parse_redmine_datetime(str(updated_on)) >= cutoff
|
|
|
|
|
|
def parse_redmine_datetime(raw: str) -> datetime:
|
|
normalized = raw.replace("Z", "+00:00")
|
|
parsed = datetime.fromisoformat(normalized)
|
|
if parsed.tzinfo is None:
|
|
return parsed.replace(tzinfo=timezone.utc)
|
|
return parsed.astimezone(timezone.utc)
|