Job Lifecycle
Every maintenance job in Snowpack follows a fenced execution lifecycle. The fence — an attempt_id generated per claim — ensures that at most one worker can write terminal state for a given job, even under pod eviction, network partitions, or stale-reclaim races.
Lifecycle sequence
sequenceDiagram
participant Client
participant API as FastAPI API
participant PG as Postgres
participant KEDA as KEDA ScaledJob
participant Worker as Worker Pod
participant Spark as Spark / Kyuubi
Client->>API: POST /tables/{db}/{table}/maintenance
API->>API: Check drain_mode (503 if quiesced)
API->>API: Validate actions + lookup table in cache
API->>PG: try_lock_table (ownership-checked)
Note over PG: 409 if another job holds the lock
API->>PG: INSERT into jobs
API->>PG: INSERT into job_queue
API->>PG: history.save_job (best-effort audit)
API-->>Client: 202 Accepted + Location header
Note over KEDA: ~30s later, polls Postgres
KEDA->>PG: SELECT COUNT(*) FROM job_queue<br/>WHERE claimed_at IS NULL<br/>AND visible_at <= NOW()
Note over KEDA: depth >= 1 -> spawn worker pod
KEDA->>Worker: Spawn pod
Worker->>PG: dequeue_with_claim(worker_id, attempt_id)
Note over PG: Single-tx CTE:<br/>SELECT ... FOR UPDATE SKIP LOCKED<br/>SET claimed_at, claimed_by, attempt_id,<br/>visible_at = NOW() + 1800s,<br/>attempt_count += 1<br/>UPDATE jobs SET status='running'
Worker->>Worker: Start shutdown watcher thread
Worker->>Worker: Start heartbeat thread (60s lease renewal)
loop For each action (sorted)
Worker->>Worker: Check _shutdown_requested
Worker->>Worker: Check cancel (status=cancelled)
Worker->>Spark: Create engine, run action
Spark-->>Worker: Action result
end
Worker->>PG: store.update(results) -- writes job_actions
Worker->>PG: store.write_terminal(attempt_id, status, error)
Note over PG: UPDATE jobs WHERE job_id=?<br/>AND attempt_id=? (fenced)
Worker->>PG: store.ack(attempt_id)
Note over PG: DELETE FROM job_queue<br/>WHERE job_id=? AND attempt_id=?
Worker->>PG: store.unlock_table(db, tbl, job_id)
Note over PG: DELETE FROM table_locks<br/>WHERE holder = job_id
Worker->>Worker: _job_cleanup_completed.set()
Worker->>Worker: Exit 0 or 1
Step-by-step walkthrough
-
Client submits maintenance request.
POST /tables/{db}/{table}/maintenancehits the API. The API checks drain mode (returns 503 if quiesced), validates the requested actions, and looks up the table in the cache. -
API acquires table lock and enqueues.
JobStore.try_lock_tabletakes an ownership-checked lock on the table key — if another job already holds it, the API returns 409. On success the API inserts a row intojobs(statuspending), inserts a row intojob_queue, and writes a best-effort audit record viahistory.save_job. The client receives202 Acceptedwith aLocationheader pointing to the job resource. -
KEDA polls for work. The KEDA postgresql scaler runs
SELECT COUNT(*) FROM job_queue WHERE claimed_at IS NULL AND visible_at <= NOW()every 30 seconds. When the depth is at least 1, KEDA spawns a worker pod. -
Worker claims the job.
dequeue_with_claim(worker_id, attempt_id)runs a single-transaction CTE that atomically selects the next unclaimed job (FOR UPDATE SKIP LOCKED), setsclaimed_at,claimed_by,attempt_id, pushesvisible_atforward by 1800 seconds (the claim lease), incrementsattempt_count, and flips the job status torunningwith the matchingattempt_id. -
Worker executes actions. A heartbeat thread renews the claim lease and table lock every 60 seconds. For each action (sorted), the worker checks
_shutdown_requested(SIGTERM path) andstatus=cancelled(cancel path) before creating a Spark engine and running the action. -
Worker writes terminal state. After all actions complete (or on early exit), the worker writes
job_actionsresults, callswrite_terminalto set the final job status (fenced byattempt_id), acks the queue row, unlocks the table, and signals_job_cleanup_completedso the shutdown watcher thread becomes a no-op. The worker then exits with code 0 (success) or 1 (failure/SIGTERM).
Fence semantics (DL-197)
attempt_id (uuid4 hex) is generated per claim and written to both jobs.attempt_id and job_queue.attempt_id. Every post-claim write is predicated on that fence:
- write_terminal(attempt_id=X) —
UPDATE jobs WHERE job_id=? AND attempt_id=X - ack(attempt_id=X) —
DELETE FROM job_queue WHERE job_id=? AND attempt_id=X - heartbeat_claim(attempt_id=X) —
UPDATE job_queue ... WHERE attempt_id=X
If the fence is rotated — by cancel writing attempt_id=NULL, or by the sweeper reclaiming the job — the worker’s writes match 0 rows and return False. The worker logs writeback_stale_attempt and exits cleanly.
Invariant: at most one attempt_id value ever reaches terminal state for a given job_id.
SIGTERM recovery
Every worker installs a SIGTERM handler that sets _shutdown_requested. A daemon “shutdown watcher” thread monitors that event. On SIGTERM the watcher runs fenced cleanup:
write_terminalwithstatus=failedanderror="Worker received SIGTERM"ackthe queue rowunlock_table- Force
os._exit(1)
This ensures DB state is tidied in approximately 2 seconds, even if the main thread is blocked inside a long-running Spark JDBC call. Without the watcher, the job would remain in running state until the 30-minute sweeper lease expires and reclaims it.