API Reference¶
This section documents the public Python API for embedding Supervice or extending its functionality.
supervice.models¶
Data models used throughout the project.
HealthCheckType¶
class HealthCheckType(Enum):
NONE = "none"
TCP = "tcp"
SCRIPT = "script"
HealthCheckConfig¶
@dataclass
class HealthCheckConfig:
type: HealthCheckType = HealthCheckType.NONE
interval: int = 30
timeout: int = 10
retries: int = 3
start_period: int = 10
port: int | None = None
host: str = "127.0.0.1"
command: str | None = None
ProgramConfig¶
@dataclass
class ProgramConfig:
name: str
command: str
numprocs: int = 1
autostart: bool = True
autorestart: bool = True
startsecs: int = 1
startretries: int = 3
stopsignal: str = "TERM"
stopwaitsecs: int = 10
stdout_logfile: str | None = None
stderr_logfile: str | None = None
environment: dict[str, str] = field(default_factory=dict)
directory: str | None = None
user: str | None = None
group: str | None = None
healthcheck: HealthCheckConfig = field(default_factory=HealthCheckConfig)
SupervisorConfig¶
@dataclass
class SupervisorConfig:
logfile: str = "supervice.log"
pidfile: str = "supervice.pid"
loglevel: str = "INFO"
socket_path: str = "/tmp/supervice.sock"
shutdown_timeout: int = 30
log_maxbytes: int = 52428800 # 50MB
log_backups: int = 10
programs: list[ProgramConfig] = field(default_factory=list)
supervice.config¶
Configuration parsing and validation.
parse_config(path: str) -> SupervisorConfig¶
Parse an INI configuration file and return a validated SupervisorConfig.
Parameters:
path— Path to the configuration file
Raises:
FileNotFoundError— Config file does not existConfigValidationError— Validation error in config
Example:
from supervice.config import parse_config
config = parse_config("supervisord.conf")
for prog in config.programs:
print(f"{prog.name}: {prog.command}")
ConfigValidationError¶
class ConfigValidationError(ValueError):
pass
Raised when configuration validation fails. The error message describes the specific validation failure.
supervice.core¶
Supervisor¶
The central coordinator that manages processes, RPC, and lifecycle.
class Supervisor:
config: SupervisorConfig
processes: dict[str, Process]
groups: dict[str, list[str]]
load_config(path: str) -> None¶
Load configuration from an INI file, set up logging, create Process instances.
async run() -> None¶
Start the supervisor. Installs signal handlers, starts processes, starts RPC server, and waits for shutdown signal.
async shutdown() -> None¶
Gracefully shut down all processes, stop RPC server, release PID file lock.
async reload_config() -> dict[str, list[str]]¶
Reload configuration from disk. Returns a dict with keys added, removed,
and changed, each containing a sorted list of process names.
Example:
import asyncio
from supervice.core import Supervisor
supervisor = Supervisor()
supervisor.load_config("supervisord.conf")
asyncio.run(supervisor.run())
supervice.process¶
Process States¶
STOPPED = "STOPPED"
STARTING = "STARTING"
RUNNING = "RUNNING"
BACKOFF = "BACKOFF"
STOPPING = "STOPPING"
EXITED = "EXITED"
FATAL = "FATAL"
UNHEALTHY = "UNHEALTHY"
Process¶
Manages a single OS process lifecycle.
class Process:
config: ProgramConfig
state: str
process: asyncio.subprocess.Process | None
should_run: bool
started_at: float | None
is_healthy: bool | None
async start() -> None¶
Start the supervision task (internal lifecycle management).
async stop() -> None¶
Stop the supervision task and kill the process.
async start_process() -> None¶
Request the process to start (used by RPC). Waits up to 5 seconds for the
process to reach RUNNING state.
async stop_process() -> None¶
Request the process to stop (used by RPC). Sends the configured stop signal.
async force_kill() -> None¶
Immediately SIGKILL the process without graceful shutdown.
async kill() -> None¶
Send stop signal to the entire process group, escalate to SIGKILL after
stopwaitsecs timeout.
supervice.client¶
Controller¶
Client for communicating with the Supervice daemon over Unix socket.
class Controller:
def __init__(self, socket_path: str = "/tmp/supervice.sock"): ...
async send_command(command: str, **kwargs) -> dict¶
Send a raw RPC command and return the response.
async status() -> bool¶
Print process status table. Returns True on success.
async start_process(name: str) -> bool¶
Start a named process. Returns True on success.
async stop_process(name: str) -> bool¶
Stop a named process. Returns True on success.
async restart_process(name: str, force: bool = False) -> bool¶
Restart a named process. With force=True, uses SIGKILL. Returns True on success.
async start_group(name: str) -> bool¶
Start all processes in a group. Returns True on success.
async stop_group(name: str) -> bool¶
Stop all processes in a group. Returns True on success.
async reload() -> bool¶
Reload daemon configuration. Returns True on success.
Example:
import asyncio
from supervice.client import Controller
async def check():
ctl = Controller("/tmp/supervice.sock")
await ctl.status()
await ctl.restart_process("webapp")
asyncio.run(check())
supervice.events¶
EventType¶
class EventType(Enum):
PROCESS_STATE_STARTING = auto()
PROCESS_STATE_RUNNING = auto()
PROCESS_STATE_BACKOFF = auto()
PROCESS_STATE_STOPPING = auto()
PROCESS_STATE_EXITED = auto()
PROCESS_STATE_STOPPED = auto()
PROCESS_STATE_FATAL = auto()
PROCESS_STATE_UNKNOWN = auto()
PROCESS_STATE_UNHEALTHY = auto()
HEALTHCHECK_PASSED = auto()
HEALTHCHECK_FAILED = auto()
Event¶
@dataclass
class Event:
type: EventType
payload: dict[str, Any]
EventBus¶
Async publish/subscribe event system.
class EventBus:
def __init__(self, maxsize: int = 1000): ...
start() -> None¶
Start the event processing task.
async stop() -> None¶
Stop the event processing task.
subscribe(event_type: EventType, handler: EventHandler) -> None¶
Register a handler for an event type. Handler signature:
async def handler(event: Event) -> None
publish(event: Event) -> None¶
Publish an event to the queue. Non-blocking. If the queue is full, the oldest event is dropped.
Example:
from supervice.events import EventBus, EventType
bus = EventBus()
async def on_running(event):
print(f"{event.payload['processname']} started")
bus.subscribe(EventType.PROCESS_STATE_RUNNING, on_running)
bus.start()
supervice.health¶
HealthCheckResult¶
class HealthCheckResult:
healthy: bool
message: str
HealthChecker (ABC)¶
class HealthChecker(ABC):
async def check(self) -> HealthCheckResult: ...
TCPHealthChecker¶
Checks TCP connectivity to a host:port.
ScriptHealthChecker¶
Runs a command and checks exit code (0 = healthy).
create_health_checker(config: HealthCheckConfig) -> HealthChecker | None¶
Factory function. Returns None if health check type is NONE.
supervice.logger¶
setup_logger(level, logfile, maxbytes, backups) -> logging.Logger¶
Configure the root supervice logger with optional file rotation.
get_logger() -> logging.Logger¶
Return the global supervice logger instance.