stejné výstupy). * - Transakční hranice a bezpečné zápisy (staging vs finální tabulky). * - Aktualizace stavu vyhodnocovacího běhu: * - current_step * - progress_total/progress_done * - run events (EvaluationRunEvent) pro UI monitoring * * Co koordinátor nedělá: * - Není to HTTP vrstva (žádné request/response). * - Není to UI ani prezentace. * - Nemá obsahovat detailní algoritmy parsingu/matchingu/scoringu; * ty patří do dedikovaných služeb. * * Doporučené zásady implementace: * - Všechny metody mají být navrženy tak, aby byly bezpečné pro opakované * spuštění (idempotentní). * - Vstupem je vždy identifikátor nebo instance EvaluationRun + volitelně scope. * - Vracej strukturované výsledky (DTO/Value Objects) a drž zápisy do DB * na jasně definovaných místech. */ class EvaluationCoordinator { /** * Koordinátor je typicky bezstavový (stateless) a jeho závislosti jsou * injektované přes DI container. * * V praxi sem budou patřit služby typu EdiParserService, MatchingService, * ScoringService, ResultsAggregationService a případně repozitáře. */ public function __construct() { // } public function start(EvaluationRun $run): void { if ($run->isCanceled()) { return; } $lockKey = $this->lockKey($run); $lock = EvaluationLock::acquire( key: $lockKey, run: $run, ttl: 7200 ); if (! $lock) { $run->update([ 'status' => 'FAILED', 'error' => 'Nelze spustit vyhodnocení: probíhá jiný běh pro stejné kolo.', ]); $this->event($run, 'error', 'StartEvaluationRunJob selhal: lock je držen jiným během.'); return; } $run->update(['batch_id' => null]); $this->transition($run, $run->status, 'RUNNING', 'start', [ 'started_at' => $run->started_at ?? now(), ]); $this->event($run, 'info', 'Spuštění vyhodnocení.', [ 'step' => 'start', 'round_id' => $run->round_id, ]); Bus::chain([ new PrepareRunJob($run->id), new DispatchParseLogsJobsJob($run->id), ])->catch(function (Throwable $e) use ($run, $lockKey) { $this->fail($run, $e, $lockKey); })->onQueue('evaluation')->dispatch(); } public function resume(EvaluationRun $run, array $options = []): bool { if ($run->isCanceled()) { return false; } $lockKey = $this->lockKey($run); if (! EvaluationLock::isLocked($lockKey)) { $lock = EvaluationLock::acquire( key: $lockKey, run: $run, ttl: 7200 ); if (! $lock) { return false; } } if ($run->status === 'WAITING_REVIEW_INPUT') { $this->transition($run, 'WAITING_REVIEW_INPUT', 'RUNNING', 'resume_input'); $this->event($run, 'info', 'Pokračování po kontrole vstupů.', [ 'step' => 'resume_input', 'round_id' => $run->round_id, 'user_id' => auth()->id(), ]); $jobs = []; if (! empty($options['rebuild_working_set'])) { $jobs[] = new DispatchBuildWorkingSetJobsJob($run->id); } $jobs[] = new DispatchMatchJobsJob($run->id); Bus::chain($jobs)->onQueue('evaluation')->dispatch(); return true; } if ($run->status === 'WAITING_REVIEW_MATCH') { $this->transition($run, 'WAITING_REVIEW_MATCH', 'RUNNING', 'resume_match'); $this->event($run, 'info', 'Pokračování po kontrole matchingu.', [ 'step' => 'resume_match', 'round_id' => $run->round_id, 'user_id' => auth()->id(), ]); DispatchScoreJobsJob::dispatch($run->id)->onQueue('evaluation'); return true; } if ($run->status === 'WAITING_REVIEW_SCORE') { $this->transition($run, 'WAITING_REVIEW_SCORE', 'RUNNING', 'resume_score'); $this->event($run, 'info', 'Pokračování po kontrole skóre.', [ 'step' => 'resume_score', 'round_id' => $run->round_id, 'user_id' => auth()->id(), ]); Bus::chain([ new FinalizeRunJob($run->id, $lockKey), ])->onQueue('evaluation')->dispatch(); return true; } return false; } public function fail(EvaluationRun $run, Throwable $e, ?string $lockKey = null): void { $run->update([ 'status' => 'FAILED', 'error' => $e->getMessage(), 'finished_at' => now(), ]); $this->event($run, 'error', "Evaluation run selhal: {$e->getMessage()}", [ 'step' => 'chain', 'round_id' => $run->round_id, ]); if ($lockKey) { EvaluationLock::release($lockKey, $run); } } public function dispatchStep(EvaluationRun $run, string $step): void { if ($step === 'match') { $this->dispatchMatch($run); return; } if ($step === 'score') { $this->dispatchScore($run); } } public function transition(EvaluationRun $run, string $from, string $to, ?string $step = null, array $extra = []): bool { if ($from !== '*' && $run->status !== $from) { return false; } $payload = array_merge([ 'status' => $to, ], $extra); if ($step !== null) { $payload['current_step'] = $step; } $run->update($payload); return true; } public function event(EvaluationRun $run, string $level, string $message, array $context = []): void { EvaluationRunEvent::create([ 'evaluation_run_id' => $run->id, 'level' => $level, 'message' => $message, 'context' => $context, ]); } public function eventInfo(EvaluationRun $run, string $message, array $context = []): void { $this->event($run, 'info', $message, $context); } public function eventWarn(EvaluationRun $run, string $message, array $context = []): void { $this->event($run, 'warning', $message, $context); } public function eventError(EvaluationRun $run, string $message, array $context = []): void { $this->event($run, 'error', $message, $context); } public function progressInit(EvaluationRun $run, int $total, int $done = 0): void { $run->update([ 'progress_total' => $total, 'progress_done' => $done, ]); } public function progressTick(EvaluationRun $run, int $n = 1): void { EvaluationRun::where('id', $run->id)->increment('progress_done', $n); } protected function dispatchMatch(EvaluationRun $run): void { if ($run->isCanceled()) { return; } $bandIds = $run->scope['band_ids'] ?? []; if (! $bandIds) { $bandIds = WorkingQso::where('evaluation_run_id', $run->id) ->distinct() ->pluck('band_id') ->all(); } if (! $bandIds) { $bandIds = [null]; } $this->transition($run, $run->status, 'RUNNING', 'match'); $jobs = []; foreach ($bandIds as $bandId) { $callNorms = WorkingQso::where('evaluation_run_id', $run->id) ->when($bandId !== null, fn ($q) => $q->where('band_id', $bandId), fn ($q) => $q->whereNull('band_id')) ->distinct() ->pluck('call_norm') ->all(); if (! $callNorms) { continue; } foreach ($callNorms as $callNorm) { $jobs[] = new MatchQsoBucketJob($run->id, $bandId, $callNorm, 1); $jobs[] = new MatchQsoBucketJob($run->id, $bandId, $callNorm, 2); } } $this->progressInit($run, count($jobs) + 2, 0); $this->event($run, 'info', 'Spuštění matchingu.', [ 'step' => 'match', 'round_id' => $run->round_id, 'step_progress_done' => 0, 'step_progress_total' => count($jobs), ]); $next = function () use ($run) { Bus::chain([ new DispatchUnpairedJobsJob($run->id), ])->onQueue('evaluation')->dispatch(); }; if (! $jobs) { $next(); return; } $batch = Bus::batch($jobs) ->then($next) ->onQueue('evaluation') ->dispatch(); $run->update(['batch_id' => $batch->id]); } protected function dispatchScore(EvaluationRun $run): void { if ($run->isCanceled()) { return; } $groups = $run->scope['groups'] ?? [ [ 'key' => 'all', 'band_id' => null, 'category_id' => null, 'power_category_id' => null, ], ]; $this->transition($run, $run->status, 'RUNNING', 'score'); $this->progressInit($run, count($groups), 0); $this->event($run, 'info', 'Spuštění scoringu.', [ 'step' => 'score', 'round_id' => $run->round_id, 'step_progress_done' => 0, 'step_progress_total' => count($groups), ]); $jobs = []; foreach ($groups as $group) { $jobs[] = new \App\Jobs\ScoreGroupJob( $run->id, $group['key'] ?? 'all', $group ); } $batch = Bus::batch($jobs) ->then(function () use ($run) { Bus::chain([ new DispatchAggregateResultsJobsJob($run->id), ])->onQueue('evaluation')->dispatch(); }) ->onQueue('evaluation') ->dispatch(); $run->update(['batch_id' => $batch->id]); } protected function lockKey(EvaluationRun $run): string { return "evaluation:round:{$run->round_id}"; } }