#!/usr/bin/env python3 """Simple Qdrant connectivity and write-path validator. Run this from the production host to confirm Qdrant is reachable, auth works, and a minimal create/upsert/read/delete round trip succeeds. """ import argparse import json import os import socket import time import urllib.error import urllib.request from typing import Any, Dict, List, Optional DEFAULT_URLS = ("http://127.0.0.1:6333", "http://10.11.0.105:6333") def main() -> int: parser = argparse.ArgumentParser(description="Validate Qdrant connectivity and basic operations.") parser.add_argument( "--url", action="append", help=( "Qdrant base URL to test. Repeat for multiple endpoints. " "Defaults to QDRANT_URL if set, otherwise localhost and 10.11.0.105." ), ) parser.add_argument( "--api-key", default=os.getenv("QDRANT_API_KEY", ""), help="Qdrant API key. Defaults to QDRANT_API_KEY env var.", ) parser.add_argument( "--skip-write-test", action="store_true", help="Only verify read-only endpoints and auth.", ) parser.add_argument( "--timeout", type=float, default=5.0, help="HTTP timeout in seconds (default: 5).", ) args = parser.parse_args() urls = normalized_urls(args.url) api_key = args.api_key.strip() failures = 0 for url in urls: print(f"\n== {url} ==") try: validate_endpoint(url, api_key=api_key, timeout=args.timeout, skip_write_test=args.skip_write_test) print(f"[OK] Endpoint validated: {url}") except ValidationError as exc: failures += 1 print(f"[FAIL] {url}: {exc}") print(f"\nSummary: {len(urls) - failures} OK, {failures} FAIL") return 1 if failures else 0 def normalized_urls(values: Optional[List[str]]) -> List[str]: if values: return [v.rstrip("/") for v in values] env_url = os.getenv("QDRANT_URL", "").strip() if env_url: return [env_url.rstrip("/")] return [u.rstrip("/") for u in DEFAULT_URLS] def validate_endpoint(base_url: str, api_key: str, timeout: float, skip_write_test: bool) -> None: headers = {"Content-Type": "application/json"} if api_key: headers["api-key"] = api_key live_text = http_text("GET", f"{base_url}/livez", headers=headers, timeout=timeout) ensure_health_ok(live_text, "livez") print("[OK] /livez") ready_text = http_text("GET", f"{base_url}/readyz", headers=headers, timeout=timeout) ensure_health_ok(ready_text, "readyz") print("[OK] /readyz") collections = http_json("GET", f"{base_url}/collections", headers=headers, timeout=timeout) ensure_status_ok(collections, "collections") count = len(collections.get("result", {}).get("collections", [])) print(f"[OK] /collections (count={count})") if skip_write_test: print("[OK] Write-path test skipped") return collection = temp_collection_name() created = False try: body = {"vectors": {"size": 4, "distance": "Cosine"}} create_result = http_json( "PUT", f"{base_url}/collections/{collection}", headers=headers, timeout=timeout, body=body, ) ensure_status_ok(create_result, "create collection") created = True print(f"[OK] Created temp collection: {collection}") point = {"id": 1, "vector": [0.1, 0.2, 0.3, 0.4], "payload": {"check": "qdrant-smoke"}} upsert_result = http_json( "PUT", f"{base_url}/collections/{collection}/points?wait=true", headers=headers, timeout=timeout, body={"points": [point]}, ) ensure_status_ok(upsert_result, "upsert point") print("[OK] Upserted test point") fetch_result = http_json( "POST", f"{base_url}/collections/{collection}/points", headers=headers, timeout=timeout, body={"ids": [1], "with_payload": True, "with_vector": True}, ) ensure_status_ok(fetch_result, "fetch point") points = fetch_result.get("result", []) if not points: raise ValidationError("fetch point returned empty result") if points[0].get("id") != 1: raise ValidationError(f"unexpected point id in fetch response: {points[0].get('id')!r}") print("[OK] Fetched test point") finally: if created: try: delete_result = http_json( "DELETE", f"{base_url}/collections/{collection}?timeout=30", headers=headers, timeout=timeout, ) ensure_status_ok(delete_result, "delete collection") print(f"[OK] Deleted temp collection: {collection}") except ValidationError as exc: print(f"[WARN] Could not delete temp collection {collection}: {exc}") def temp_collection_name() -> str: stamp = time.strftime("%Y%m%d%H%M%S") host = socket.gethostname().replace("_", "-").replace(".", "-") return f"qdrant_smoke_{host}_{stamp}_{os.getpid()}" def http_json(method: str, url: str, headers: Dict[str, str], timeout: float, body: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: data = None if body is not None: data = json.dumps(body).encode("utf-8") request = urllib.request.Request(url=url, method=method, data=data, headers=headers) try: with urllib.request.urlopen(request, timeout=timeout) as response: payload = response.read().decode("utf-8") except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") raise ValidationError(f"HTTP {exc.code} for {method} {url}: {detail.strip()}") from exc except urllib.error.URLError as exc: raise ValidationError(f"network error for {method} {url}: {exc.reason}") from exc try: return json.loads(payload) except json.JSONDecodeError as exc: raise ValidationError(f"non-JSON response for {method} {url}: {payload[:200]!r}") from exc def http_text(method: str, url: str, headers: Dict[str, str], timeout: float) -> str: request = urllib.request.Request(url=url, method=method, headers=headers) try: with urllib.request.urlopen(request, timeout=timeout) as response: return response.read().decode("utf-8", errors="replace").strip() except urllib.error.HTTPError as exc: detail = exc.read().decode("utf-8", errors="replace") raise ValidationError(f"HTTP {exc.code} for {method} {url}: {detail.strip()}") from exc except urllib.error.URLError as exc: raise ValidationError(f"network error for {method} {url}: {exc.reason}") from exc def ensure_status_ok(payload: Dict[str, Any], context: str) -> None: if payload.get("status") != "ok": raise ValidationError(f"{context} returned non-ok payload: {payload}") def ensure_health_ok(payload_text: str, context: str) -> None: text = payload_text.lower() if "passed" in text or text == "ok" or "ready" in text: return raise ValidationError(f"{context} returned unexpected payload: {payload_text!r}") class ValidationError(RuntimeError): pass if __name__ == "__main__": raise SystemExit(main())