Files
vkv/app/Services/Evaluation/EvaluationCoordinator.php
Zdeněk Burda 41e3ce6f25 Initial commit
2026-01-09 21:26:40 +01:00

373 lines
12 KiB
PHP

<?php
namespace App\Services\Evaluation;
use App\Jobs\DispatchAggregateResultsJobsJob;
use App\Jobs\DispatchBuildWorkingSetJobsJob;
use App\Jobs\DispatchMatchJobsJob;
use App\Jobs\DispatchScoreJobsJob;
use App\Jobs\DispatchUnpairedJobsJob;
use App\Jobs\FinalizeRunJob;
use App\Jobs\MatchQsoBucketJob;
use App\Jobs\DispatchParseLogsJobsJob;
use App\Jobs\PauseEvaluationRunJob;
use App\Jobs\PrepareRunJob;
use App\Models\EvaluationLock;
use App\Models\EvaluationRun;
use App\Models\EvaluationRunEvent;
use App\Models\WorkingQso;
use Illuminate\Support\Facades\Bus;
use Throwable;
/**
* Service: EvaluationCoordinator
*
* Účel:
* - Koordinátor vyhodnocovací pipeline na úrovni service layer.
* - Centralizuje orchestrace vyhodnocení (EvaluationRun) tak, aby joby
* zůstaly tenké a obsahovaly pouze řízení toku a minimální glue kód.
*
* Kontext v architektuře:
* - Joby (PrepareRunJob, DispatchParseLogsJobsJob, DispatchBuildWorkingSetJobsJob, MatchQsoBucketJob,
* ScoreGroupJob, DispatchAggregateResultsJobsJob, FinalizeRunJob) mají být co nejvíce
* bez logiky: pouze načtou kontext běhu, zavolají metodu koordinátoru/služby
* a zapíší stav/progress.
* - EvaluationCoordinator deleguje konkrétní operace na specializované služby:
* - EdiParserService (parsing EDI, validace formátu, mapování hlaviček/QSO)
* - MatchingService (matching protistanic, NIL/busted/duplicate/out-of-window)
* - ScoringService (výpočet bodů dle rule setu, multiplikátory, policy)
* - ResultsAggregationService (součty, ranking, per-log agregace)
* - EvaluationFinalizerService (finalizace běhu, publikace, uvolnění locků)
*
* Co koordinátor řeší:
* - Konzistentní práci se "scope" vyhodnocení (round/band/category/power).
* - Determinismus a idempotenci kroků (stejné vstupy => stejné výstupy).
* - Transakční hranice a bezpečné zápisy (staging vs finální tabulky).
* - Aktualizace stavu vyhodnocovacího běhu:
* - current_step
* - progress_total/progress_done
* - run events (EvaluationRunEvent) pro UI monitoring
*
* Co koordinátor nedělá:
* - Není to HTTP vrstva (žádné request/response).
* - Není to UI ani prezentace.
* - Nemá obsahovat detailní algoritmy parsingu/matchingu/scoringu;
* ty patří do dedikovaných služeb.
*
* Doporučené zásady implementace:
* - Všechny metody mají být navrženy tak, aby byly bezpečné pro opakované
* spuštění (idempotentní).
* - Vstupem je vždy identifikátor nebo instance EvaluationRun + volitelně scope.
* - Vracej strukturované výsledky (DTO/Value Objects) a drž zápisy do DB
* na jasně definovaných místech.
*/
class EvaluationCoordinator
{
/**
* Koordinátor je typicky bezstavový (stateless) a jeho závislosti jsou
* injektované přes DI container.
*
* V praxi sem budou patřit služby typu EdiParserService, MatchingService,
* ScoringService, ResultsAggregationService a případně repozitáře.
*/
public function __construct()
{
//
}
public function start(EvaluationRun $run): void
{
if ($run->isCanceled()) {
return;
}
$lockKey = $this->lockKey($run);
$lock = EvaluationLock::acquire(
key: $lockKey,
run: $run,
ttl: 7200
);
if (! $lock) {
$run->update([
'status' => 'FAILED',
'error' => 'Nelze spustit vyhodnocení: probíhá jiný běh pro stejné kolo.',
]);
$this->event($run, 'error', 'StartEvaluationRunJob selhal: lock je držen jiným během.');
return;
}
$run->update(['batch_id' => null]);
$this->transition($run, $run->status, 'RUNNING', 'start', [
'started_at' => $run->started_at ?? now(),
]);
$this->event($run, 'info', 'Spuštění vyhodnocení.', [
'step' => 'start',
'round_id' => $run->round_id,
]);
Bus::chain([
new PrepareRunJob($run->id),
new DispatchParseLogsJobsJob($run->id),
])->catch(function (Throwable $e) use ($run, $lockKey) {
$this->fail($run, $e, $lockKey);
})->onQueue('evaluation')->dispatch();
}
public function resume(EvaluationRun $run, array $options = []): bool
{
if ($run->isCanceled()) {
return false;
}
$lockKey = $this->lockKey($run);
if (! EvaluationLock::isLocked($lockKey)) {
$lock = EvaluationLock::acquire(
key: $lockKey,
run: $run,
ttl: 7200
);
if (! $lock) {
return false;
}
}
if ($run->status === 'WAITING_REVIEW_INPUT') {
$this->transition($run, 'WAITING_REVIEW_INPUT', 'RUNNING', 'resume_input');
$this->event($run, 'info', 'Pokračování po kontrole vstupů.', [
'step' => 'resume_input',
'round_id' => $run->round_id,
'user_id' => auth()->id(),
]);
$jobs = [];
if (! empty($options['rebuild_working_set'])) {
$jobs[] = new DispatchBuildWorkingSetJobsJob($run->id);
}
$jobs[] = new DispatchMatchJobsJob($run->id);
Bus::chain($jobs)->onQueue('evaluation')->dispatch();
return true;
}
if ($run->status === 'WAITING_REVIEW_MATCH') {
$this->transition($run, 'WAITING_REVIEW_MATCH', 'RUNNING', 'resume_match');
$this->event($run, 'info', 'Pokračování po kontrole matchingu.', [
'step' => 'resume_match',
'round_id' => $run->round_id,
'user_id' => auth()->id(),
]);
DispatchScoreJobsJob::dispatch($run->id)->onQueue('evaluation');
return true;
}
if ($run->status === 'WAITING_REVIEW_SCORE') {
$this->transition($run, 'WAITING_REVIEW_SCORE', 'RUNNING', 'resume_score');
$this->event($run, 'info', 'Pokračování po kontrole skóre.', [
'step' => 'resume_score',
'round_id' => $run->round_id,
'user_id' => auth()->id(),
]);
Bus::chain([
new FinalizeRunJob($run->id, $lockKey),
])->onQueue('evaluation')->dispatch();
return true;
}
return false;
}
public function fail(EvaluationRun $run, Throwable $e, ?string $lockKey = null): void
{
$run->update([
'status' => 'FAILED',
'error' => $e->getMessage(),
'finished_at' => now(),
]);
$this->event($run, 'error', "Evaluation run selhal: {$e->getMessage()}", [
'step' => 'chain',
'round_id' => $run->round_id,
]);
if ($lockKey) {
EvaluationLock::release($lockKey, $run);
}
}
public function dispatchStep(EvaluationRun $run, string $step): void
{
if ($step === 'match') {
$this->dispatchMatch($run);
return;
}
if ($step === 'score') {
$this->dispatchScore($run);
}
}
public function transition(EvaluationRun $run, string $from, string $to, ?string $step = null, array $extra = []): bool
{
if ($from !== '*' && $run->status !== $from) {
return false;
}
$payload = array_merge([
'status' => $to,
], $extra);
if ($step !== null) {
$payload['current_step'] = $step;
}
$run->update($payload);
return true;
}
public function event(EvaluationRun $run, string $level, string $message, array $context = []): void
{
EvaluationRunEvent::create([
'evaluation_run_id' => $run->id,
'level' => $level,
'message' => $message,
'context' => $context,
]);
}
public function eventInfo(EvaluationRun $run, string $message, array $context = []): void
{
$this->event($run, 'info', $message, $context);
}
public function eventWarn(EvaluationRun $run, string $message, array $context = []): void
{
$this->event($run, 'warning', $message, $context);
}
public function eventError(EvaluationRun $run, string $message, array $context = []): void
{
$this->event($run, 'error', $message, $context);
}
public function progressInit(EvaluationRun $run, int $total, int $done = 0): void
{
$run->update([
'progress_total' => $total,
'progress_done' => $done,
]);
}
public function progressTick(EvaluationRun $run, int $n = 1): void
{
EvaluationRun::where('id', $run->id)->increment('progress_done', $n);
}
protected function dispatchMatch(EvaluationRun $run): void
{
if ($run->isCanceled()) {
return;
}
$bandIds = $run->scope['band_ids'] ?? [];
if (! $bandIds) {
$bandIds = WorkingQso::where('evaluation_run_id', $run->id)
->distinct()
->pluck('band_id')
->all();
}
if (! $bandIds) {
$bandIds = [null];
}
$this->transition($run, $run->status, 'RUNNING', 'match');
$jobs = [];
foreach ($bandIds as $bandId) {
$callNorms = WorkingQso::where('evaluation_run_id', $run->id)
->when($bandId !== null, fn ($q) => $q->where('band_id', $bandId), fn ($q) => $q->whereNull('band_id'))
->distinct()
->pluck('call_norm')
->all();
if (! $callNorms) {
continue;
}
foreach ($callNorms as $callNorm) {
$jobs[] = new MatchQsoBucketJob($run->id, $bandId, $callNorm, 1);
$jobs[] = new MatchQsoBucketJob($run->id, $bandId, $callNorm, 2);
}
}
$this->progressInit($run, count($jobs) + 2, 0);
$this->event($run, 'info', 'Spuštění matchingu.', [
'step' => 'match',
'round_id' => $run->round_id,
'step_progress_done' => 0,
'step_progress_total' => count($jobs),
]);
$next = function () use ($run) {
Bus::chain([
new DispatchUnpairedJobsJob($run->id),
])->onQueue('evaluation')->dispatch();
};
if (! $jobs) {
$next();
return;
}
$batch = Bus::batch($jobs)
->then($next)
->onQueue('evaluation')
->dispatch();
$run->update(['batch_id' => $batch->id]);
}
protected function dispatchScore(EvaluationRun $run): void
{
if ($run->isCanceled()) {
return;
}
$groups = $run->scope['groups'] ?? [
[
'key' => 'all',
'band_id' => null,
'category_id' => null,
'power_category_id' => null,
],
];
$this->transition($run, $run->status, 'RUNNING', 'score');
$this->progressInit($run, count($groups), 0);
$this->event($run, 'info', 'Spuštění scoringu.', [
'step' => 'score',
'round_id' => $run->round_id,
'step_progress_done' => 0,
'step_progress_total' => count($groups),
]);
$jobs = [];
foreach ($groups as $group) {
$jobs[] = new \App\Jobs\ScoreGroupJob(
$run->id,
$group['key'] ?? 'all',
$group
);
}
$batch = Bus::batch($jobs)
->then(function () use ($run) {
Bus::chain([
new DispatchAggregateResultsJobsJob($run->id),
])->onQueue('evaluation')->dispatch();
})
->onQueue('evaluation')
->dispatch();
$run->update(['batch_id' => $batch->id]);
}
protected function lockKey(EvaluationRun $run): string
{
return "evaluation:round:{$run->round_id}";
}
}