Files
redmine/validate_qdrant.py
Jason Thistlethwaite bd26c8894f Add production rollout tooling and semantic index ops docs
Capture the production plugin rollout workflow and Qdrant validation steps so operations stay repeatable. Also harden redMCP stdio/schema compatibility to keep diverse MCP clients and validators working.
2026-05-06 22:18:02 -04:00

209 lines
7.3 KiB
Python
Executable File

#!/usr/bin/env python3
"""Simple Qdrant connectivity and write-path validator.
Run this from the production host to confirm Qdrant is reachable, auth works,
and a minimal create/upsert/read/delete round trip succeeds.
"""
import argparse
import json
import os
import socket
import time
import urllib.error
import urllib.request
from typing import Any, Dict, List, Optional
DEFAULT_URLS = ("http://127.0.0.1:6333", "http://10.11.0.105:6333")
def main() -> int:
parser = argparse.ArgumentParser(description="Validate Qdrant connectivity and basic operations.")
parser.add_argument(
"--url",
action="append",
help=(
"Qdrant base URL to test. Repeat for multiple endpoints. "
"Defaults to QDRANT_URL if set, otherwise localhost and 10.11.0.105."
),
)
parser.add_argument(
"--api-key",
default=os.getenv("QDRANT_API_KEY", ""),
help="Qdrant API key. Defaults to QDRANT_API_KEY env var.",
)
parser.add_argument(
"--skip-write-test",
action="store_true",
help="Only verify read-only endpoints and auth.",
)
parser.add_argument(
"--timeout",
type=float,
default=5.0,
help="HTTP timeout in seconds (default: 5).",
)
args = parser.parse_args()
urls = normalized_urls(args.url)
api_key = args.api_key.strip()
failures = 0
for url in urls:
print(f"\n== {url} ==")
try:
validate_endpoint(url, api_key=api_key, timeout=args.timeout, skip_write_test=args.skip_write_test)
print(f"[OK] Endpoint validated: {url}")
except ValidationError as exc:
failures += 1
print(f"[FAIL] {url}: {exc}")
print(f"\nSummary: {len(urls) - failures} OK, {failures} FAIL")
return 1 if failures else 0
def normalized_urls(values: Optional[List[str]]) -> List[str]:
if values:
return [v.rstrip("/") for v in values]
env_url = os.getenv("QDRANT_URL", "").strip()
if env_url:
return [env_url.rstrip("/")]
return [u.rstrip("/") for u in DEFAULT_URLS]
def validate_endpoint(base_url: str, api_key: str, timeout: float, skip_write_test: bool) -> None:
headers = {"Content-Type": "application/json"}
if api_key:
headers["api-key"] = api_key
live_text = http_text("GET", f"{base_url}/livez", headers=headers, timeout=timeout)
ensure_health_ok(live_text, "livez")
print("[OK] /livez")
ready_text = http_text("GET", f"{base_url}/readyz", headers=headers, timeout=timeout)
ensure_health_ok(ready_text, "readyz")
print("[OK] /readyz")
collections = http_json("GET", f"{base_url}/collections", headers=headers, timeout=timeout)
ensure_status_ok(collections, "collections")
count = len(collections.get("result", {}).get("collections", []))
print(f"[OK] /collections (count={count})")
if skip_write_test:
print("[OK] Write-path test skipped")
return
collection = temp_collection_name()
created = False
try:
body = {"vectors": {"size": 4, "distance": "Cosine"}}
create_result = http_json(
"PUT",
f"{base_url}/collections/{collection}",
headers=headers,
timeout=timeout,
body=body,
)
ensure_status_ok(create_result, "create collection")
created = True
print(f"[OK] Created temp collection: {collection}")
point = {"id": 1, "vector": [0.1, 0.2, 0.3, 0.4], "payload": {"check": "qdrant-smoke"}}
upsert_result = http_json(
"PUT",
f"{base_url}/collections/{collection}/points?wait=true",
headers=headers,
timeout=timeout,
body={"points": [point]},
)
ensure_status_ok(upsert_result, "upsert point")
print("[OK] Upserted test point")
fetch_result = http_json(
"POST",
f"{base_url}/collections/{collection}/points",
headers=headers,
timeout=timeout,
body={"ids": [1], "with_payload": True, "with_vector": True},
)
ensure_status_ok(fetch_result, "fetch point")
points = fetch_result.get("result", [])
if not points:
raise ValidationError("fetch point returned empty result")
if points[0].get("id") != 1:
raise ValidationError(f"unexpected point id in fetch response: {points[0].get('id')!r}")
print("[OK] Fetched test point")
finally:
if created:
try:
delete_result = http_json(
"DELETE",
f"{base_url}/collections/{collection}?timeout=30",
headers=headers,
timeout=timeout,
)
ensure_status_ok(delete_result, "delete collection")
print(f"[OK] Deleted temp collection: {collection}")
except ValidationError as exc:
print(f"[WARN] Could not delete temp collection {collection}: {exc}")
def temp_collection_name() -> str:
stamp = time.strftime("%Y%m%d%H%M%S")
host = socket.gethostname().replace("_", "-").replace(".", "-")
return f"qdrant_smoke_{host}_{stamp}_{os.getpid()}"
def http_json(method: str, url: str, headers: Dict[str, str], timeout: float, body: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
data = None
if body is not None:
data = json.dumps(body).encode("utf-8")
request = urllib.request.Request(url=url, method=method, data=data, headers=headers)
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
payload = response.read().decode("utf-8")
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise ValidationError(f"HTTP {exc.code} for {method} {url}: {detail.strip()}") from exc
except urllib.error.URLError as exc:
raise ValidationError(f"network error for {method} {url}: {exc.reason}") from exc
try:
return json.loads(payload)
except json.JSONDecodeError as exc:
raise ValidationError(f"non-JSON response for {method} {url}: {payload[:200]!r}") from exc
def http_text(method: str, url: str, headers: Dict[str, str], timeout: float) -> str:
request = urllib.request.Request(url=url, method=method, headers=headers)
try:
with urllib.request.urlopen(request, timeout=timeout) as response:
return response.read().decode("utf-8", errors="replace").strip()
except urllib.error.HTTPError as exc:
detail = exc.read().decode("utf-8", errors="replace")
raise ValidationError(f"HTTP {exc.code} for {method} {url}: {detail.strip()}") from exc
except urllib.error.URLError as exc:
raise ValidationError(f"network error for {method} {url}: {exc.reason}") from exc
def ensure_status_ok(payload: Dict[str, Any], context: str) -> None:
if payload.get("status") != "ok":
raise ValidationError(f"{context} returned non-ok payload: {payload}")
def ensure_health_ok(payload_text: str, context: str) -> None:
text = payload_text.lower()
if "passed" in text or text == "ok" or "ready" in text:
return
raise ValidationError(f"{context} returned unexpected payload: {payload_text!r}")
class ValidationError(RuntimeError):
pass
if __name__ == "__main__":
raise SystemExit(main())