#!/usr/bin/env python3 """Read-only validation for the LAN Redmine test instance. This script is intended to be run after a database import and the documented post-import reset steps. It reports whether the test instance looks ready for Helpdesk and redMCP testing without changing remote state. """ import argparse import json import os import shutil import socket import subprocess from pathlib import Path from typing import Any DEFAULT_SSH_HOST = "reddev@192.168.50.170" DEFAULT_SSH_KEY = Path("/tmp/reddev") DEFAULT_REMOTE_REDMINE = "/usr/share/redmine" DEFAULT_MAILPIT_HOST = "192.168.1.105" DEFAULT_FILES_ROOT = "/var/lib/redmine/default/files" class CheckResult: def __init__(self, status, name, detail): self.status = status self.name = name self.detail = detail class RemoteRedmine: def __init__(self, ssh_host, ssh_key, remote_redmine, local=False): self.ssh_host = ssh_host self.ssh_key = ssh_key self.remote_redmine = remote_redmine self.local = local def ssh(self, remote_command): if self.local: return subprocess.run( remote_command, shell=True, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) return subprocess.run( [ "ssh", "-i", str(self.ssh_key), "-o", "IdentitiesOnly=yes", self.ssh_host, remote_command, ], universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) def mysql_json_lines(self, sql): result = self.mysql(sql) rows = [] for line in result.splitlines(): if not line.strip(): continue rows.append(json.loads(bytes.fromhex(line.strip()).decode("utf-8"))) return rows def mysql(self, sql): command = self._mysql_runner_command() shell = True if not self.local: command = [ "ssh", "-i", str(self.ssh_key), "-o", "IdentitiesOnly=yes", self.ssh_host, self._mysql_runner_command(), ] shell = False result = subprocess.run( command, input=sql, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, shell=shell, ) if result.returncode != 0: raise RuntimeError(result.stderr.strip() or "Remote MySQL command failed.") return result.stdout def _mysql_runner_command(self): ruby = ( "require 'yaml'; " "c = YAML.load_file('config/database.yml')['production']; " "ENV['MYSQL_PWD'] = c['password'].to_s; " "args = ['--batch', '--raw', '--quick', '--skip-column-names', " "'--default-character-set=utf8', '-h', c['host'].to_s, " "'-P', (c['port'] || 3306).to_s, '-u', c['username'].to_s, c['database'].to_s]; " "exec('mysql', *args)" ) return f"cd {shell_quote(self.remote_redmine)} && ruby -e {shell_quote(ruby)}" def main(): parser = argparse.ArgumentParser(description="Read-only checks for the Redmine LAN test instance.") parser.add_argument("--ssh-host", default=os.getenv("REDMINE_SSH_HOST", DEFAULT_SSH_HOST)) parser.add_argument("--ssh-key", type=Path, default=Path(os.getenv("REDMINE_SSH_KEY", str(DEFAULT_SSH_KEY)))) parser.add_argument("--local", action="store_true", help="Validate local Redmine paths/database instead of using SSH.") parser.add_argument("--remote-redmine", default=os.getenv("REDMINE_REMOTE_PATH", DEFAULT_REMOTE_REDMINE)) parser.add_argument("--mailpit-host", default=DEFAULT_MAILPIT_HOST) parser.add_argument("--files-root", default=DEFAULT_FILES_ROOT) parser.add_argument( "--composer-bin", default=os.getenv("COMPOSER_BIN"), help="Optional Composer binary or composer.phar for redMCP validation.", ) args = parser.parse_args() remote = RemoteRedmine(args.ssh_host, args.ssh_key, args.remote_redmine, local=args.local) checks = [] checks.extend(check_remote_basics(remote)) checks.extend(check_mailpit_connectivity(remote, args.mailpit_host)) checks.extend(check_files_permissions(remote, args.files_root)) checks.extend(check_database_state(remote, args.mailpit_host)) checks.append(check_composer(args.composer_bin)) for result in checks: print(f"[{result.status}] {result.name}: {result.detail}") failures = [result for result in checks if result.status == "FAIL"] warnings = [result for result in checks if result.status == "WARN"] print(f"\nSummary: {len(checks) - len(failures) - len(warnings)} OK, {len(warnings)} WARN, {len(failures)} FAIL") return 1 if failures else 0 def check_remote_basics(remote): results = [] result = remote.ssh("printf ok") if result.returncode == 0 and result.stdout == "ok": results.append(CheckResult("OK", "SSH", f"connected to {remote.ssh_host}")) else: return [CheckResult("FAIL", "SSH", (result.stderr or result.stdout).strip() or "connection failed")] path_check = remote.ssh(f"test -d {shell_quote(remote.remote_redmine)} && printf ok") if path_check.returncode == 0 and path_check.stdout == "ok": results.append(CheckResult("OK", "Remote Redmine path", remote.remote_redmine)) else: results.append(CheckResult("FAIL", "Remote Redmine path", f"missing: {remote.remote_redmine}")) tmp_check = remote.ssh( f"cd {shell_quote(remote.remote_redmine)} && " "if [ -d tmp ]; then stat -c '%U %G %a %n' tmp; test -w tmp; else exit 2; fi" ) if tmp_check.returncode == 0: results.append(CheckResult("OK", "Passenger restart directory", tmp_check.stdout.strip())) elif tmp_check.returncode == 1: results.append(CheckResult("WARN", "Passenger restart directory", "tmp exists but is not writable by SSH user")) else: results.append(CheckResult("FAIL", "Passenger restart directory", "tmp directory missing")) return results def check_mailpit_connectivity(remote, host): results = [ tcp_check("Mailpit HTTP from local", host, 8025), tcp_check("Mailpit SMTP from local", host, 1025), tcp_check("Mailpit POP3 from local", host, 1110), ] remote_command = ( "ruby -rsocket -e " + shell_quote( "host = ARGV[0]; " "{1025 => 'SMTP', 1110 => 'POP3'}.each do |port, name| " "begin; s = TCPSocket.new(host, port); s.close; puts \"#{name}=ok\"; " "rescue => e; puts \"#{name}=#{e.class}: #{e.message}\"; exit 1; end; " "end" ) + " " + shell_quote(host) ) result = remote.ssh(remote_command) if result.returncode == 0: results.append(CheckResult("OK", "Mailpit from Redmine host", result.stdout.strip().replace("\n", ", "))) else: results.append(CheckResult("FAIL", "Mailpit from Redmine host", (result.stdout or result.stderr).strip())) return results def check_files_permissions(remote, files_root): command = ( "ruby -e " + shell_quote( "require 'json'; require 'etc'; " "paths = ARGV; " "rows = paths.map do |p| " "if File.directory?(p); st = File.stat(p); " "{path: p, mode: sprintf('%o', st.mode & 07777), group_writable: (st.mode & 0020) != 0, owner: Etc.getpwuid(st.uid).name, group: Etc.getgrgid(st.gid).name}; " "else; {path: p, missing: true}; end; " "end; puts JSON.generate(rows)" ) + " " + " ".join(shell_quote(path) for path in [files_root, f"{files_root}/2026", f"{files_root}/2026/04"]) ) result = remote.ssh(command) if result.returncode != 0: return [CheckResult("FAIL", "Attachment directory permissions", (result.stderr or result.stdout).strip())] rows = json.loads(result.stdout) failures = [row for row in rows if row.get("missing") or not row.get("group_writable")] if failures: detail = "; ".join(f"{row['path']} mode={row.get('mode', 'missing')}" for row in failures) return [CheckResult("FAIL", "Attachment directory permissions", detail)] detail = "; ".join(f"{row['path']} {row['owner']}:{row['group']} {row['mode']}" for row in rows) return [CheckResult("OK", "Attachment directory permissions", detail)] def check_database_state(remote, mailpit_host): results = [] try: projects = remote.mysql_json_lines( """ SELECT HEX(CAST(JSON_OBJECT('identifier', identifier, 'id', id, 'name', name) AS CHAR)) AS document FROM projects WHERE identifier IN ('fud-helpdesk', 'fud-nohelpdesk') ORDER BY identifier; """ ) results.append(controlled_project_check(projects)) settings_rows = remote.mysql_json_lines(helpdesk_settings_sql()) failures = helpdesk_setting_failures(settings_rows, mailpit_host) if failures: results.append(CheckResult("FAIL", "Helpdesk Mailpit settings", "; ".join(failures[:8]))) else: results.append(CheckResult("OK", "Helpdesk Mailpit settings", f"{len(settings_rows)} active project(s) match")) except Exception as exc: results.append(CheckResult("FAIL", "Database checks", f"{exc.__class__.__name__}: {exc}")) return results def helpdesk_settings_sql(): return """ SELECT HEX(CAST(JSON_OBJECT( 'identifier', p.identifier, 'protocol', MAX(CASE WHEN cs.name = 'helpdesk_protocol' THEN cs.value END), 'host', MAX(CASE WHEN cs.name = 'helpdesk_host' THEN cs.value END), 'port', MAX(CASE WHEN cs.name = 'helpdesk_port' THEN cs.value END), 'username', MAX(CASE WHEN cs.name = 'helpdesk_username' THEN cs.value END), 'has_password', MAX(CASE WHEN cs.name = 'helpdesk_password' AND cs.value <> '' THEN 1 ELSE 0 END), 'apop', MAX(CASE WHEN cs.name = 'helpdesk_apop' THEN cs.value END), 'delete_unprocessed', MAX(CASE WHEN cs.name = 'helpdesk_delete_unprocessed' THEN cs.value END), 'smtp_custom', MAX(CASE WHEN cs.name = 'helpdesk_smtp_use_default_settings' THEN cs.value END), 'smtp_server', MAX(CASE WHEN cs.name = 'helpdesk_smtp_server' THEN cs.value END), 'smtp_port', MAX(CASE WHEN cs.name = 'helpdesk_smtp_port' THEN cs.value END), 'smtp_auth', MAX(CASE WHEN cs.name = 'helpdesk_smtp_authentication' THEN cs.value END), 'smtp_username', MAX(CASE WHEN cs.name = 'helpdesk_smtp_username' THEN cs.value END), 'smtp_ssl', MAX(CASE WHEN cs.name = 'helpdesk_smtp_ssl' THEN cs.value END), 'smtp_tls', MAX(CASE WHEN cs.name = 'helpdesk_smtp_tls' THEN cs.value END) ) AS CHAR)) AS document FROM projects p LEFT JOIN contacts_settings cs ON cs.project_id = p.id WHERE p.status = 1 GROUP BY p.id, p.identifier ORDER BY p.identifier; """ def helpdesk_setting_failures(rows, mailpit_host): expected = { "protocol": "pop3", "host": mailpit_host, "port": "1110", "username": "test", "has_password": 1, "apop": "0", "delete_unprocessed": "0", "smtp_custom": "1", "smtp_server": mailpit_host, "smtp_port": "1025", "smtp_auth": "", "smtp_username": "", "smtp_ssl": "0", "smtp_tls": "0", } failures = [] for row in rows: for key, value in expected.items(): if row.get(key) != value: failures.append(f"{row['identifier']} {key}={row.get(key)!r} expected {value!r}") return failures def controlled_project_check(projects): found = {project["identifier"] for project in projects} missing = {"fud-helpdesk", "fud-nohelpdesk"} - found if missing: return CheckResult( "WARN", "Controlled test projects", "optional smoke-test project(s) missing after production clone: " + ", ".join(sorted(missing)), ) return CheckResult("OK", "Controlled test projects", ", ".join(sorted(found))) def check_composer(composer_bin): if not composer_bin: return CheckResult("WARN", "Composer validation", "skipped; pass --composer-bin to enable") composer_path = Path(composer_bin) composer_on_path = shutil.which(composer_bin) if composer_on_path is None and not composer_path.exists(): return CheckResult("WARN", "Composer validation", f"{composer_bin!r} not found; skipped") command = [composer_on_path or composer_bin, "validate", "--working-dir=redMCP"] if composer_path.suffix == ".phar": php = shutil.which("php") if php is None: return CheckResult("WARN", "Composer validation", "php not found; composer.phar skipped") command = [php, composer_bin, "validate", "--working-dir=redMCP"] result = subprocess.run( command, universal_newlines=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, check=False, ) if result.returncode == 0: return CheckResult("OK", "Composer validation", "redMCP/composer.json is valid") return CheckResult("FAIL", "Composer validation", (result.stdout + result.stderr).strip()) def tcp_check(name, host, port): try: with socket.create_connection((host, port), timeout=5): return CheckResult("OK", name, f"{host}:{port}") except OSError as exc: return CheckResult("FAIL", name, f"{host}:{port} {exc.__class__.__name__}: {exc}") def shell_quote(value): return "'" + value.replace("'", "'\"'\"'") + "'" if __name__ == "__main__": raise SystemExit(main())