373 lines
12 KiB
PHP
373 lines
12 KiB
PHP
<?php
|
|
|
|
namespace App\Services\Evaluation;
|
|
|
|
use App\Jobs\DispatchAggregateResultsJobsJob;
|
|
use App\Jobs\DispatchBuildWorkingSetJobsJob;
|
|
use App\Jobs\DispatchMatchJobsJob;
|
|
use App\Jobs\DispatchScoreJobsJob;
|
|
use App\Jobs\DispatchUnpairedJobsJob;
|
|
use App\Jobs\FinalizeRunJob;
|
|
use App\Jobs\MatchQsoBucketJob;
|
|
use App\Jobs\DispatchParseLogsJobsJob;
|
|
use App\Jobs\PauseEvaluationRunJob;
|
|
use App\Jobs\PrepareRunJob;
|
|
use App\Models\EvaluationLock;
|
|
use App\Models\EvaluationRun;
|
|
use App\Models\EvaluationRunEvent;
|
|
use App\Models\WorkingQso;
|
|
use Illuminate\Support\Facades\Bus;
|
|
use Throwable;
|
|
|
|
/**
|
|
* Service: EvaluationCoordinator
|
|
*
|
|
* Účel:
|
|
* - Koordinátor vyhodnocovací pipeline na úrovni service layer.
|
|
* - Centralizuje orchestrace vyhodnocení (EvaluationRun) tak, aby joby
|
|
* zůstaly tenké a obsahovaly pouze řízení toku a minimální glue kód.
|
|
*
|
|
* Kontext v architektuře:
|
|
* - Joby (PrepareRunJob, DispatchParseLogsJobsJob, DispatchBuildWorkingSetJobsJob, MatchQsoBucketJob,
|
|
* ScoreGroupJob, DispatchAggregateResultsJobsJob, FinalizeRunJob) mají být co nejvíce
|
|
* bez logiky: pouze načtou kontext běhu, zavolají metodu koordinátoru/služby
|
|
* a zapíší stav/progress.
|
|
* - EvaluationCoordinator deleguje konkrétní operace na specializované služby:
|
|
* - EdiParserService (parsing EDI, validace formátu, mapování hlaviček/QSO)
|
|
* - MatchingService (matching protistanic, NIL/busted/duplicate/out-of-window)
|
|
* - ScoringService (výpočet bodů dle rule setu, multiplikátory, policy)
|
|
* - ResultsAggregationService (součty, ranking, per-log agregace)
|
|
* - EvaluationFinalizerService (finalizace běhu, publikace, uvolnění locků)
|
|
*
|
|
* Co koordinátor řeší:
|
|
* - Konzistentní práci se "scope" vyhodnocení (round/band/category/power).
|
|
* - Determinismus a idempotenci kroků (stejné vstupy => stejné výstupy).
|
|
* - Transakční hranice a bezpečné zápisy (staging vs finální tabulky).
|
|
* - Aktualizace stavu vyhodnocovacího běhu:
|
|
* - current_step
|
|
* - progress_total/progress_done
|
|
* - run events (EvaluationRunEvent) pro UI monitoring
|
|
*
|
|
* Co koordinátor nedělá:
|
|
* - Není to HTTP vrstva (žádné request/response).
|
|
* - Není to UI ani prezentace.
|
|
* - Nemá obsahovat detailní algoritmy parsingu/matchingu/scoringu;
|
|
* ty patří do dedikovaných služeb.
|
|
*
|
|
* Doporučené zásady implementace:
|
|
* - Všechny metody mají být navrženy tak, aby byly bezpečné pro opakované
|
|
* spuštění (idempotentní).
|
|
* - Vstupem je vždy identifikátor nebo instance EvaluationRun + volitelně scope.
|
|
* - Vracej strukturované výsledky (DTO/Value Objects) a drž zápisy do DB
|
|
* na jasně definovaných místech.
|
|
*/
|
|
class EvaluationCoordinator
|
|
{
|
|
/**
|
|
* Koordinátor je typicky bezstavový (stateless) a jeho závislosti jsou
|
|
* injektované přes DI container.
|
|
*
|
|
* V praxi sem budou patřit služby typu EdiParserService, MatchingService,
|
|
* ScoringService, ResultsAggregationService a případně repozitáře.
|
|
*/
|
|
public function __construct()
|
|
{
|
|
//
|
|
}
|
|
|
|
public function start(EvaluationRun $run): void
|
|
{
|
|
if ($run->isCanceled()) {
|
|
return;
|
|
}
|
|
$lockKey = $this->lockKey($run);
|
|
$lock = EvaluationLock::acquire(
|
|
key: $lockKey,
|
|
run: $run,
|
|
ttl: 7200
|
|
);
|
|
|
|
if (! $lock) {
|
|
$run->update([
|
|
'status' => 'FAILED',
|
|
'error' => 'Nelze spustit vyhodnocení: probíhá jiný běh pro stejné kolo.',
|
|
]);
|
|
$this->event($run, 'error', 'StartEvaluationRunJob selhal: lock je držen jiným během.');
|
|
return;
|
|
}
|
|
|
|
$run->update(['batch_id' => null]);
|
|
$this->transition($run, $run->status, 'RUNNING', 'start', [
|
|
'started_at' => $run->started_at ?? now(),
|
|
]);
|
|
$this->event($run, 'info', 'Spuštění vyhodnocení.', [
|
|
'step' => 'start',
|
|
'round_id' => $run->round_id,
|
|
]);
|
|
|
|
Bus::chain([
|
|
new PrepareRunJob($run->id),
|
|
new DispatchParseLogsJobsJob($run->id),
|
|
])->catch(function (Throwable $e) use ($run, $lockKey) {
|
|
$this->fail($run, $e, $lockKey);
|
|
})->onQueue('evaluation')->dispatch();
|
|
}
|
|
|
|
public function resume(EvaluationRun $run, array $options = []): bool
|
|
{
|
|
if ($run->isCanceled()) {
|
|
return false;
|
|
}
|
|
$lockKey = $this->lockKey($run);
|
|
if (! EvaluationLock::isLocked($lockKey)) {
|
|
$lock = EvaluationLock::acquire(
|
|
key: $lockKey,
|
|
run: $run,
|
|
ttl: 7200
|
|
);
|
|
if (! $lock) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
if ($run->status === 'WAITING_REVIEW_INPUT') {
|
|
$this->transition($run, 'WAITING_REVIEW_INPUT', 'RUNNING', 'resume_input');
|
|
$this->event($run, 'info', 'Pokračování po kontrole vstupů.', [
|
|
'step' => 'resume_input',
|
|
'round_id' => $run->round_id,
|
|
'user_id' => auth()->id(),
|
|
]);
|
|
|
|
$jobs = [];
|
|
if (! empty($options['rebuild_working_set'])) {
|
|
$jobs[] = new DispatchBuildWorkingSetJobsJob($run->id);
|
|
}
|
|
$jobs[] = new DispatchMatchJobsJob($run->id);
|
|
|
|
Bus::chain($jobs)->onQueue('evaluation')->dispatch();
|
|
return true;
|
|
}
|
|
|
|
if ($run->status === 'WAITING_REVIEW_MATCH') {
|
|
$this->transition($run, 'WAITING_REVIEW_MATCH', 'RUNNING', 'resume_match');
|
|
$this->event($run, 'info', 'Pokračování po kontrole matchingu.', [
|
|
'step' => 'resume_match',
|
|
'round_id' => $run->round_id,
|
|
'user_id' => auth()->id(),
|
|
]);
|
|
|
|
DispatchScoreJobsJob::dispatch($run->id)->onQueue('evaluation');
|
|
return true;
|
|
}
|
|
|
|
if ($run->status === 'WAITING_REVIEW_SCORE') {
|
|
$this->transition($run, 'WAITING_REVIEW_SCORE', 'RUNNING', 'resume_score');
|
|
$this->event($run, 'info', 'Pokračování po kontrole skóre.', [
|
|
'step' => 'resume_score',
|
|
'round_id' => $run->round_id,
|
|
'user_id' => auth()->id(),
|
|
]);
|
|
|
|
Bus::chain([
|
|
new FinalizeRunJob($run->id, $lockKey),
|
|
])->onQueue('evaluation')->dispatch();
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
public function fail(EvaluationRun $run, Throwable $e, ?string $lockKey = null): void
|
|
{
|
|
$run->update([
|
|
'status' => 'FAILED',
|
|
'error' => $e->getMessage(),
|
|
'finished_at' => now(),
|
|
]);
|
|
$this->event($run, 'error', "Evaluation run selhal: {$e->getMessage()}", [
|
|
'step' => 'chain',
|
|
'round_id' => $run->round_id,
|
|
]);
|
|
if ($lockKey) {
|
|
EvaluationLock::release($lockKey, $run);
|
|
}
|
|
}
|
|
|
|
public function dispatchStep(EvaluationRun $run, string $step): void
|
|
{
|
|
if ($step === 'match') {
|
|
$this->dispatchMatch($run);
|
|
return;
|
|
}
|
|
if ($step === 'score') {
|
|
$this->dispatchScore($run);
|
|
}
|
|
}
|
|
|
|
public function transition(EvaluationRun $run, string $from, string $to, ?string $step = null, array $extra = []): bool
|
|
{
|
|
if ($from !== '*' && $run->status !== $from) {
|
|
return false;
|
|
}
|
|
|
|
$payload = array_merge([
|
|
'status' => $to,
|
|
], $extra);
|
|
|
|
if ($step !== null) {
|
|
$payload['current_step'] = $step;
|
|
}
|
|
|
|
$run->update($payload);
|
|
|
|
return true;
|
|
}
|
|
|
|
public function event(EvaluationRun $run, string $level, string $message, array $context = []): void
|
|
{
|
|
EvaluationRunEvent::create([
|
|
'evaluation_run_id' => $run->id,
|
|
'level' => $level,
|
|
'message' => $message,
|
|
'context' => $context,
|
|
]);
|
|
}
|
|
|
|
public function eventInfo(EvaluationRun $run, string $message, array $context = []): void
|
|
{
|
|
$this->event($run, 'info', $message, $context);
|
|
}
|
|
|
|
public function eventWarn(EvaluationRun $run, string $message, array $context = []): void
|
|
{
|
|
$this->event($run, 'warning', $message, $context);
|
|
}
|
|
|
|
public function eventError(EvaluationRun $run, string $message, array $context = []): void
|
|
{
|
|
$this->event($run, 'error', $message, $context);
|
|
}
|
|
|
|
public function progressInit(EvaluationRun $run, int $total, int $done = 0): void
|
|
{
|
|
$run->update([
|
|
'progress_total' => $total,
|
|
'progress_done' => $done,
|
|
]);
|
|
}
|
|
|
|
public function progressTick(EvaluationRun $run, int $n = 1): void
|
|
{
|
|
EvaluationRun::where('id', $run->id)->increment('progress_done', $n);
|
|
}
|
|
|
|
protected function dispatchMatch(EvaluationRun $run): void
|
|
{
|
|
if ($run->isCanceled()) {
|
|
return;
|
|
}
|
|
$bandIds = $run->scope['band_ids'] ?? [];
|
|
if (! $bandIds) {
|
|
$bandIds = WorkingQso::where('evaluation_run_id', $run->id)
|
|
->distinct()
|
|
->pluck('band_id')
|
|
->all();
|
|
}
|
|
if (! $bandIds) {
|
|
$bandIds = [null];
|
|
}
|
|
|
|
$this->transition($run, $run->status, 'RUNNING', 'match');
|
|
$jobs = [];
|
|
foreach ($bandIds as $bandId) {
|
|
$callNorms = WorkingQso::where('evaluation_run_id', $run->id)
|
|
->when($bandId !== null, fn ($q) => $q->where('band_id', $bandId), fn ($q) => $q->whereNull('band_id'))
|
|
->distinct()
|
|
->pluck('call_norm')
|
|
->all();
|
|
|
|
if (! $callNorms) {
|
|
continue;
|
|
}
|
|
|
|
foreach ($callNorms as $callNorm) {
|
|
$jobs[] = new MatchQsoBucketJob($run->id, $bandId, $callNorm, 1);
|
|
$jobs[] = new MatchQsoBucketJob($run->id, $bandId, $callNorm, 2);
|
|
}
|
|
}
|
|
|
|
$this->progressInit($run, count($jobs) + 2, 0);
|
|
$this->event($run, 'info', 'Spuštění matchingu.', [
|
|
'step' => 'match',
|
|
'round_id' => $run->round_id,
|
|
'step_progress_done' => 0,
|
|
'step_progress_total' => count($jobs),
|
|
]);
|
|
|
|
$next = function () use ($run) {
|
|
Bus::chain([
|
|
new DispatchUnpairedJobsJob($run->id),
|
|
])->onQueue('evaluation')->dispatch();
|
|
};
|
|
|
|
if (! $jobs) {
|
|
$next();
|
|
return;
|
|
}
|
|
|
|
$batch = Bus::batch($jobs)
|
|
->then($next)
|
|
->onQueue('evaluation')
|
|
->dispatch();
|
|
$run->update(['batch_id' => $batch->id]);
|
|
}
|
|
|
|
protected function dispatchScore(EvaluationRun $run): void
|
|
{
|
|
if ($run->isCanceled()) {
|
|
return;
|
|
}
|
|
$groups = $run->scope['groups'] ?? [
|
|
[
|
|
'key' => 'all',
|
|
'band_id' => null,
|
|
'category_id' => null,
|
|
'power_category_id' => null,
|
|
],
|
|
];
|
|
|
|
$this->transition($run, $run->status, 'RUNNING', 'score');
|
|
$this->progressInit($run, count($groups), 0);
|
|
$this->event($run, 'info', 'Spuštění scoringu.', [
|
|
'step' => 'score',
|
|
'round_id' => $run->round_id,
|
|
'step_progress_done' => 0,
|
|
'step_progress_total' => count($groups),
|
|
]);
|
|
|
|
$jobs = [];
|
|
foreach ($groups as $group) {
|
|
$jobs[] = new \App\Jobs\ScoreGroupJob(
|
|
$run->id,
|
|
$group['key'] ?? 'all',
|
|
$group
|
|
);
|
|
}
|
|
|
|
$batch = Bus::batch($jobs)
|
|
->then(function () use ($run) {
|
|
Bus::chain([
|
|
new DispatchAggregateResultsJobsJob($run->id),
|
|
])->onQueue('evaluation')->dispatch();
|
|
})
|
|
->onQueue('evaluation')
|
|
->dispatch();
|
|
$run->update(['batch_id' => $batch->id]);
|
|
}
|
|
|
|
protected function lockKey(EvaluationRun $run): string
|
|
{
|
|
return "evaluation:round:{$run->round_id}";
|
|
}
|
|
}
|