feat: orchestrate package imports

This commit is contained in:
Keith Solomon
2026-05-01 15:12:22 -05:00
parent 425d42e4bb
commit e9f4b17229
2 changed files with 428 additions and 0 deletions
+136
View File
@@ -0,0 +1,136 @@
<?php
/**
* Orchestrates content package imports.
*
* @package WPContentSync
*/
namespace WPContentSync\Sync;
use WPContentSync\Content\ContentHandlerRegistry;
use WPContentSync\Content\ContentImportException;
use WPContentSync\Logging\LoggerInterface;
use WPContentSync\Package\ContentPackage;
use WPContentSync\Settings\SettingsRepository;
final class SyncEngine {
private ContentHandlerRegistry $handlers;
private SyncStateRepository $state_repository;
private SettingsRepository $settings_repository;
private LoggerInterface $logger;
public function __construct(
ContentHandlerRegistry $handlers,
SyncStateRepository $state_repository,
SettingsRepository $settings_repository,
LoggerInterface $logger
) {
$this->handlers = $handlers;
$this->state_repository = $state_repository;
$this->settings_repository = $settings_repository;
$this->logger = $logger;
}
public function importPackage( ContentPackage $package ): SyncResult {
$operation_id = uniqid( 'wpcs_', true );
$total = $this->totalRecords( $package );
$processed = 0;
$results = array();
$context = SyncContext::forImport(
$package->source(),
$package->destination(),
$this->settings_repository->get()->conflictStrategy(),
$operation_id
);
$this->logger->info(
'Starting content package import.',
array(
'operation_id' => $operation_id,
'total' => $total,
)
);
foreach ( $this->handlers->ordered() as $handler ) {
$bucket = $handler->bucket();
$records = $this->recordsForBucket( $package, $bucket );
$this->state_repository->save( SyncOperationState::running( $operation_id, $bucket, $processed, $total ) );
try {
$results[] = $handler->importRecords( $records, $context );
} catch ( ContentImportException $exception ) {
$this->state_repository->save(
SyncOperationState::fromArray(
array(
'operation_id' => $operation_id,
'status' => 'failed',
'current_bucket' => $bucket,
'processed' => $processed,
'total' => $total,
)
)
);
$this->logger->error(
'Content package import failed.',
array(
'operation_id' => $operation_id,
'bucket' => $exception->bucket(),
'record' => $exception->record(),
'error' => $exception->getMessage(),
)
);
$results[] = SyncResult::failure( array( $exception->getMessage() ) );
return SyncResult::merge( $results );
}
$processed += count( $records );
}
$result = SyncResult::merge( $results );
$this->state_repository->save( SyncOperationState::completed( $operation_id, $processed, $total ) );
$this->logger->info(
'Completed content package import.',
array_merge(
array( 'operation_id' => $operation_id ),
$result->toArray()
)
);
return $result;
}
private function totalRecords( ContentPackage $package ): int {
$total = 0;
foreach ( $package->manifest() as $count ) {
$total += max( 0, (int) $count );
}
return $total;
}
/**
* @return array<int, array<string, mixed>>
*/
private function recordsForBucket( ContentPackage $package, string $bucket ): array {
$records = $package->records()[ $bucket ] ?? array();
if ( ! is_array( $records ) ) {
return array();
}
return array_values(
array_filter(
$records,
static function ( $record ): bool {
return is_array( $record );
}
)
);
}
}
+292
View File
@@ -0,0 +1,292 @@
<?php
/**
* Tests for sync engine import orchestration.
*
* @package WPContentSync
*/
namespace WPContentSync\Tests\Unit\Sync;
use PHPUnit\Framework\TestCase;
use WPContentSync\Content\ContentHandlerInterface;
use WPContentSync\Content\ContentHandlerRegistry;
use WPContentSync\Content\ContentImportException;
use WPContentSync\Logging\LoggerInterface;
use WPContentSync\Package\ContentPackage;
use WPContentSync\Settings\SettingsRepository;
use WPContentSync\Sync\SyncContext;
use WPContentSync\Sync\SyncEngine;
use WPContentSync\Sync\SyncResult;
use WPContentSync\Sync\SyncStateRepository;
class SyncEngineTest extends TestCase {
/** @var \ArrayObject<int, array<string, mixed>> */
private \ArrayObject $call_recorder;
/** @var \ArrayObject<int, array<string, mixed>> */
private \ArrayObject $log_recorder;
protected function setUp(): void {
$this->call_recorder = new \ArrayObject();
$this->log_recorder = new \ArrayObject();
parent::setUp();
}
protected function tearDown(): void {
unset(
$GLOBALS['wpcs_test_options'],
$GLOBALS['wpcs_test_option_autoloads'],
$GLOBALS['wpcs_test_transients'],
$GLOBALS['wpcs_test_transient_expiration']
);
parent::tearDown();
}
public function test_it_calls_handlers_in_registry_order_and_merges_results(): void {
$engine = $this->engine(
array(
$this->handler( 'media', SyncResult::success( array( 'created' => 1 ) ) ),
$this->handler( 'posts', SyncResult::success( array( 'updated' => 2 ) ) ),
$this->handler(
'terms',
SyncResult::success(
array(
'skipped' => 1,
'conflicts' => 1,
)
)
),
)
);
$result = $engine->importPackage( $this->package() );
self::assertTrue( $result->isSuccessful() );
self::assertSame( 1, $result->created() );
self::assertSame( 2, $result->updated() );
self::assertSame( 1, $result->skipped() );
self::assertSame( 1, $result->conflicts() );
$calls = $this->call_recorder->getArrayCopy();
self::assertSame( array( 'terms', 'posts', 'media' ), array_column( $calls, 'bucket' ) );
self::assertSame( array( array( 'id' => 20 ) ), $calls[0]['records'] );
self::assertSame( array( array( 'id' => 10 ) ), $calls[1]['records'] );
self::assertSame( array( array( 'id' => 30 ) ), $calls[2]['records'] );
self::assertSame( 'manual_review', $calls[0]['context']->conflictStrategy() );
}
public function test_it_saves_running_and_completed_state(): void {
$engine = $this->engine(
array(
$this->handler( 'posts', SyncResult::success( array( 'created' => 1 ) ) ),
$this->handler( 'media', SyncResult::success( array( 'created' => 1 ) ) ),
)
);
$engine->importPackage( $this->package() );
$states = array_values( $GLOBALS['wpcs_test_transients'] );
self::assertCount( 1, $GLOBALS['wpcs_test_transients'] );
self::assertSame( 'completed', $states[0]['status'] );
self::assertSame( 2, $states[0]['processed'] );
self::assertSame( 3, $states[0]['total'] );
}
public function test_it_logs_operation_start_and_completion(): void {
$engine = $this->engine(
array(
$this->handler( 'posts', SyncResult::success( array( 'created' => 1 ) ) ),
)
);
$engine->importPackage( $this->package() );
$logs = $this->log_recorder->getArrayCopy();
self::assertSame( 'Starting content package import.', $logs[0]['message'] );
self::assertSame( 'Completed content package import.', $logs[1]['message'] );
self::assertSame( 1, $logs[1]['context']['created'] );
}
public function test_it_returns_failure_when_handler_throws_import_exception(): void {
$engine = $this->engine(
array(
$this->throwingHandler( 'posts' ),
$this->handler( 'media', SyncResult::success( array( 'created' => 1 ) ) ),
)
);
$result = $engine->importPackage( $this->package() );
$states = array_values( $GLOBALS['wpcs_test_transients'] );
$logs = $this->log_recorder->getArrayCopy();
self::assertFalse( $result->isSuccessful() );
self::assertSame( array( 'Posts failed.' ), $result->errors() );
self::assertSame( 'failed', $states[0]['status'] );
self::assertSame( 'Content package import failed.', $logs[1]['message'] );
self::assertSame( 'posts', $logs[1]['context']['bucket'] );
}
/**
* @param array<int, ContentHandlerInterface> $handlers Handlers.
*/
private function engine( array $handlers ): SyncEngine {
update_option(
SettingsRepository::OPTION_NAME,
array(
'conflict_strategy' => 'manual_review',
),
false
);
return new SyncEngine(
new ContentHandlerRegistry( $handlers ),
new SyncStateRepository(),
new SettingsRepository(),
$this->logger()
);
}
private function package(): ContentPackage {
return ContentPackage::fromArray(
array(
'source' => array(
'site_url' => 'https://source.test',
),
'destination' => array(
'site_url' => 'https://destination.test',
),
'manifest' => array(
'posts' => 1,
'terms' => 1,
'media' => 1,
),
'records' => array(
'posts' => array(
array( 'id' => 10 ),
),
'terms' => array(
array( 'id' => 20 ),
),
'media' => array(
array( 'id' => 30 ),
),
),
)
);
}
private function handler( string $bucket, SyncResult $result ): ContentHandlerInterface {
return new class( $bucket, $result, $this->call_recorder ) implements ContentHandlerInterface {
private string $bucket;
private SyncResult $result;
/** @var \ArrayObject<int, array<string, mixed>> */
private \ArrayObject $recorder;
/**
* @param \ArrayObject<int, array<string, mixed>> $recorder Call recorder.
*/
public function __construct( string $bucket, SyncResult $result, \ArrayObject $recorder ) {
$this->bucket = $bucket;
$this->result = $result;
$this->recorder = $recorder;
}
public function bucket(): string {
return $this->bucket;
}
public function importRecords( array $records, SyncContext $context ): SyncResult {
$this->recorder->append(
array(
'bucket' => $this->bucket,
'records' => $records,
'context' => $context,
)
);
return $this->result;
}
};
}
private function throwingHandler( string $bucket ): ContentHandlerInterface {
return new class( $bucket, $this->call_recorder ) implements ContentHandlerInterface {
private string $bucket;
/** @var \ArrayObject<int, array<string, mixed>> */
private \ArrayObject $recorder;
/**
* @param \ArrayObject<int, array<string, mixed>> $recorder Call recorder.
*/
public function __construct( string $bucket, \ArrayObject $recorder ) {
$this->bucket = $bucket;
$this->recorder = $recorder;
}
public function bucket(): string {
return $this->bucket;
}
public function importRecords( array $records, SyncContext $context ): SyncResult {
$this->recorder->append(
array(
'bucket' => $this->bucket,
'records' => $records,
'context' => $context,
)
);
throw new ContentImportException( $this->bucket, array( 'id' => 10 ), 'Posts failed.' );
}
};
}
private function logger(): LoggerInterface {
return new class( $this->log_recorder ) implements LoggerInterface {
/** @var \ArrayObject<int, array<string, mixed>> */
private \ArrayObject $recorder;
/**
* @param \ArrayObject<int, array<string, mixed>> $recorder Log recorder.
*/
public function __construct( \ArrayObject $recorder ) {
$this->recorder = $recorder;
}
public function error( string $message, array $context = array() ): void {
$this->record( 'error', $message, $context );
}
public function warning( string $message, array $context = array() ): void {
$this->record( 'warning', $message, $context );
}
public function info( string $message, array $context = array() ): void {
$this->record( 'info', $message, $context );
}
public function debug( string $message, array $context = array() ): void {
$this->record( 'debug', $message, $context );
}
/**
* @param array<string, mixed> $context Context.
*/
private function record( string $level, string $message, array $context ): void {
$this->recorder->append(
array(
'level' => $level,
'message' => $message,
'context' => $context,
)
);
}
};
}
}