#!/usr/bin/env python3 """Read-only validation for the LAN Redmine test instance. This script is intended to be run after a database import and the documented post-import reset steps. It reports whether the test instance looks ready for Helpdesk and redMCP testing without changing remote state. """ from __future__ import annotations import argparse import json import os import shutil import socket import subprocess from dataclasses import dataclass from pathlib import Path from typing import Any DEFAULT_SSH_HOST = "reddev@192.168.50.170" DEFAULT_SSH_KEY = Path("/tmp/reddev") DEFAULT_REMOTE_REDMINE = "/usr/share/redmine" DEFAULT_MAILPIT_HOST = "192.168.1.105" DEFAULT_FILES_ROOT = "/var/lib/redmine/default/files" @dataclass(frozen=True) class CheckResult: status: str name: str detail: str @dataclass(frozen=True) class RemoteRedmine: ssh_host: str ssh_key: Path remote_redmine: str def ssh(self, remote_command: str) -> subprocess.CompletedProcess[str]: return subprocess.run( [ "ssh", "-i", str(self.ssh_key), "-o", "IdentitiesOnly=yes", self.ssh_host, remote_command, ], text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) def mysql_json_lines(self, sql: str) -> list[dict[str, Any]]: result = self.mysql(sql) rows: list[dict[str, Any]] = [] for line in result.splitlines(): if not line.strip(): continue rows.append(json.loads(bytes.fromhex(line.strip()).decode("utf-8"))) return rows def mysql(self, sql: str) -> str: result = subprocess.run( [ "ssh", "-i", str(self.ssh_key), "-o", "IdentitiesOnly=yes", self.ssh_host, self._mysql_runner_command(), ], input=sql, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) if result.returncode != 0: raise RuntimeError(result.stderr.strip() or "Remote MySQL command failed.") return result.stdout def _mysql_runner_command(self) -> str: ruby = ( "require 'yaml'; " "c = YAML.load_file('config/database.yml')['production']; " "ENV['MYSQL_PWD'] = c['password'].to_s; " "args = ['--batch', '--raw', '--quick', '--skip-column-names', " "'--default-character-set=utf8', '-h', c['host'].to_s, " "'-P', (c['port'] || 3306).to_s, '-u', c['username'].to_s, c['database'].to_s]; " "exec('mysql', *args)" ) return f"cd {shell_quote(self.remote_redmine)} && ruby -e {shell_quote(ruby)}" def main() -> int: parser = argparse.ArgumentParser(description="Read-only checks for the Redmine LAN test instance.") parser.add_argument("--ssh-host", default=os.getenv("REDMINE_SSH_HOST", DEFAULT_SSH_HOST)) parser.add_argument("--ssh-key", type=Path, default=Path(os.getenv("REDMINE_SSH_KEY", str(DEFAULT_SSH_KEY)))) parser.add_argument("--remote-redmine", default=os.getenv("REDMINE_REMOTE_PATH", DEFAULT_REMOTE_REDMINE)) parser.add_argument("--mailpit-host", default=DEFAULT_MAILPIT_HOST) parser.add_argument("--files-root", default=DEFAULT_FILES_ROOT) parser.add_argument("--composer-bin", default=os.getenv("COMPOSER_BIN", "composer")) args = parser.parse_args() remote = RemoteRedmine(args.ssh_host, args.ssh_key, args.remote_redmine) checks: list[CheckResult] = [] checks.extend(check_remote_basics(remote)) checks.extend(check_mailpit_connectivity(remote, args.mailpit_host)) checks.extend(check_files_permissions(remote, args.files_root)) checks.extend(check_database_state(remote, args.mailpit_host)) checks.append(check_composer(args.composer_bin)) for result in checks: print(f"[{result.status}] {result.name}: {result.detail}") failures = [result for result in checks if result.status == "FAIL"] warnings = [result for result in checks if result.status == "WARN"] print(f"\nSummary: {len(checks) - len(failures) - len(warnings)} OK, {len(warnings)} WARN, {len(failures)} FAIL") return 1 if failures else 0 def check_remote_basics(remote: RemoteRedmine) -> list[CheckResult]: results: list[CheckResult] = [] result = remote.ssh("printf ok") if result.returncode == 0 and result.stdout == "ok": results.append(CheckResult("OK", "SSH", f"connected to {remote.ssh_host}")) else: return [CheckResult("FAIL", "SSH", (result.stderr or result.stdout).strip() or "connection failed")] path_check = remote.ssh(f"test -d {shell_quote(remote.remote_redmine)} && printf ok") if path_check.returncode == 0 and path_check.stdout == "ok": results.append(CheckResult("OK", "Remote Redmine path", remote.remote_redmine)) else: results.append(CheckResult("FAIL", "Remote Redmine path", f"missing: {remote.remote_redmine}")) tmp_check = remote.ssh( f"cd {shell_quote(remote.remote_redmine)} && " "if [ -d tmp ]; then stat -c '%U %G %a %n' tmp; test -w tmp; else exit 2; fi" ) if tmp_check.returncode == 0: results.append(CheckResult("OK", "Passenger restart directory", tmp_check.stdout.strip())) elif tmp_check.returncode == 1: results.append(CheckResult("WARN", "Passenger restart directory", "tmp exists but is not writable by SSH user")) else: results.append(CheckResult("FAIL", "Passenger restart directory", "tmp directory missing")) return results def check_mailpit_connectivity(remote: RemoteRedmine, host: str) -> list[CheckResult]: results = [ tcp_check("Mailpit HTTP from local", host, 8025), tcp_check("Mailpit SMTP from local", host, 1025), tcp_check("Mailpit POP3 from local", host, 1110), ] remote_command = ( "ruby -rsocket -e " + shell_quote( "host = ARGV[0]; " "{1025 => 'SMTP', 1110 => 'POP3'}.each do |port, name| " "begin; s = TCPSocket.new(host, port); s.close; puts \"#{name}=ok\"; " "rescue => e; puts \"#{name}=#{e.class}: #{e.message}\"; exit 1; end; " "end" ) + " " + shell_quote(host) ) result = remote.ssh(remote_command) if result.returncode == 0: results.append(CheckResult("OK", "Mailpit from Redmine host", result.stdout.strip().replace("\n", ", "))) else: results.append(CheckResult("FAIL", "Mailpit from Redmine host", (result.stdout or result.stderr).strip())) return results def check_files_permissions(remote: RemoteRedmine, files_root: str) -> list[CheckResult]: command = ( "ruby -e " + shell_quote( "require 'json'; require 'etc'; " "paths = ARGV; " "rows = paths.map do |p| " "if File.directory?(p); st = File.stat(p); " "{path: p, mode: sprintf('%o', st.mode & 07777), group_writable: (st.mode & 0020) != 0, owner: Etc.getpwuid(st.uid).name, group: Etc.getgrgid(st.gid).name}; " "else; {path: p, missing: true}; end; " "end; puts JSON.generate(rows)" ) + " " + " ".join(shell_quote(path) for path in [files_root, f"{files_root}/2026", f"{files_root}/2026/04"]) ) result = remote.ssh(command) if result.returncode != 0: return [CheckResult("FAIL", "Attachment directory permissions", (result.stderr or result.stdout).strip())] rows = json.loads(result.stdout) failures = [row for row in rows if row.get("missing") or not row.get("group_writable")] if failures: detail = "; ".join(f"{row['path']} mode={row.get('mode', 'missing')}" for row in failures) return [CheckResult("FAIL", "Attachment directory permissions", detail)] detail = "; ".join(f"{row['path']} {row['owner']}:{row['group']} {row['mode']}" for row in rows) return [CheckResult("OK", "Attachment directory permissions", detail)] def check_database_state(remote: RemoteRedmine, mailpit_host: str) -> list[CheckResult]: results: list[CheckResult] = [] try: projects = remote.mysql_json_lines( """ SELECT HEX(CAST(JSON_OBJECT('identifier', identifier, 'id', id, 'name', name) AS CHAR)) AS document FROM projects WHERE identifier IN ('fud-helpdesk', 'fud-nohelpdesk') ORDER BY identifier; """ ) found = {project["identifier"] for project in projects} missing = {"fud-helpdesk", "fud-nohelpdesk"} - found if missing: results.append(CheckResult("FAIL", "Controlled test projects", "missing " + ", ".join(sorted(missing)))) else: results.append(CheckResult("OK", "Controlled test projects", ", ".join(sorted(found)))) settings_rows = remote.mysql_json_lines(helpdesk_settings_sql()) failures = helpdesk_setting_failures(settings_rows, mailpit_host) if failures: results.append(CheckResult("FAIL", "Helpdesk Mailpit settings", "; ".join(failures[:8]))) else: results.append(CheckResult("OK", "Helpdesk Mailpit settings", f"{len(settings_rows)} project(s) match")) except Exception as exc: results.append(CheckResult("FAIL", "Database checks", f"{exc.__class__.__name__}: {exc}")) return results def helpdesk_settings_sql() -> str: return """ SELECT HEX(CAST(JSON_OBJECT( 'identifier', p.identifier, 'protocol', MAX(CASE WHEN cs.name = 'helpdesk_protocol' THEN cs.value END), 'host', MAX(CASE WHEN cs.name = 'helpdesk_host' THEN cs.value END), 'port', MAX(CASE WHEN cs.name = 'helpdesk_port' THEN cs.value END), 'username', MAX(CASE WHEN cs.name = 'helpdesk_username' THEN cs.value END), 'has_password', MAX(CASE WHEN cs.name = 'helpdesk_password' AND cs.value <> '' THEN 1 ELSE 0 END), 'apop', MAX(CASE WHEN cs.name = 'helpdesk_apop' THEN cs.value END), 'delete_unprocessed', MAX(CASE WHEN cs.name = 'helpdesk_delete_unprocessed' THEN cs.value END), 'smtp_custom', MAX(CASE WHEN cs.name = 'helpdesk_smtp_use_default_settings' THEN cs.value END), 'smtp_server', MAX(CASE WHEN cs.name = 'helpdesk_smtp_server' THEN cs.value END), 'smtp_port', MAX(CASE WHEN cs.name = 'helpdesk_smtp_port' THEN cs.value END), 'smtp_auth', MAX(CASE WHEN cs.name = 'helpdesk_smtp_authentication' THEN cs.value END), 'smtp_username', MAX(CASE WHEN cs.name = 'helpdesk_smtp_username' THEN cs.value END), 'smtp_ssl', MAX(CASE WHEN cs.name = 'helpdesk_smtp_ssl' THEN cs.value END), 'smtp_tls', MAX(CASE WHEN cs.name = 'helpdesk_smtp_tls' THEN cs.value END) ) AS CHAR)) AS document FROM projects p JOIN enabled_modules em ON em.project_id = p.id AND em.name = 'contacts_helpdesk' LEFT JOIN contacts_settings cs ON cs.project_id = p.id WHERE p.status = 1 GROUP BY p.id, p.identifier ORDER BY p.identifier; """ def helpdesk_setting_failures(rows: list[dict[str, Any]], mailpit_host: str) -> list[str]: expected = { "protocol": "pop3", "host": mailpit_host, "port": "1110", "username": "test", "has_password": 1, "apop": "0", "delete_unprocessed": "0", "smtp_custom": "1", "smtp_server": mailpit_host, "smtp_port": "1025", "smtp_auth": "", "smtp_username": "", "smtp_ssl": "0", "smtp_tls": "0", } failures: list[str] = [] for row in rows: for key, value in expected.items(): if row.get(key) != value: failures.append(f"{row['identifier']} {key}={row.get(key)!r} expected {value!r}") return failures def check_composer(composer_bin: str) -> CheckResult: composer_path = Path(composer_bin) composer_on_path = shutil.which(composer_bin) if composer_on_path is None and not composer_path.exists(): return CheckResult("WARN", "Composer validation", f"{composer_bin!r} not found; skipped") command = [composer_on_path or composer_bin, "validate", "--working-dir=redMCP"] if composer_path.suffix == ".phar": php = shutil.which("php") if php is None: return CheckResult("WARN", "Composer validation", "php not found; composer.phar skipped") command = [php, composer_bin, "validate", "--working-dir=redMCP"] result = subprocess.run( command, text=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) if result.returncode == 0: return CheckResult("OK", "Composer validation", "redMCP/composer.json is valid") return CheckResult("FAIL", "Composer validation", (result.stdout + result.stderr).strip()) def tcp_check(name: str, host: str, port: int) -> CheckResult: try: with socket.create_connection((host, port), timeout=5): return CheckResult("OK", name, f"{host}:{port}") except OSError as exc: return CheckResult("FAIL", name, f"{host}:{port} {exc.__class__.__name__}: {exc}") def shell_quote(value: str) -> str: return "'" + value.replace("'", "'\"'\"'") + "'" if __name__ == "__main__": raise SystemExit(main())