diff --git a/src/Sync/SyncEngine.php b/src/Sync/SyncEngine.php new file mode 100644 index 0000000..43449f8 --- /dev/null +++ b/src/Sync/SyncEngine.php @@ -0,0 +1,136 @@ +handlers = $handlers; + $this->state_repository = $state_repository; + $this->settings_repository = $settings_repository; + $this->logger = $logger; + } + + public function importPackage( ContentPackage $package ): SyncResult { + $operation_id = uniqid( 'wpcs_', true ); + $total = $this->totalRecords( $package ); + $processed = 0; + $results = array(); + $context = SyncContext::forImport( + $package->source(), + $package->destination(), + $this->settings_repository->get()->conflictStrategy(), + $operation_id + ); + + $this->logger->info( + 'Starting content package import.', + array( + 'operation_id' => $operation_id, + 'total' => $total, + ) + ); + + foreach ( $this->handlers->ordered() as $handler ) { + $bucket = $handler->bucket(); + $records = $this->recordsForBucket( $package, $bucket ); + + $this->state_repository->save( SyncOperationState::running( $operation_id, $bucket, $processed, $total ) ); + + try { + $results[] = $handler->importRecords( $records, $context ); + } catch ( ContentImportException $exception ) { + $this->state_repository->save( + SyncOperationState::fromArray( + array( + 'operation_id' => $operation_id, + 'status' => 'failed', + 'current_bucket' => $bucket, + 'processed' => $processed, + 'total' => $total, + ) + ) + ); + + $this->logger->error( + 'Content package import failed.', + array( + 'operation_id' => $operation_id, + 'bucket' => $exception->bucket(), + 'record' => $exception->record(), + 'error' => $exception->getMessage(), + ) + ); + + $results[] = SyncResult::failure( array( $exception->getMessage() ) ); + + return SyncResult::merge( $results ); + } + + $processed += count( $records ); + } + + $result = SyncResult::merge( $results ); + $this->state_repository->save( SyncOperationState::completed( $operation_id, $processed, $total ) ); + + $this->logger->info( + 'Completed content package import.', + array_merge( + array( 'operation_id' => $operation_id ), + $result->toArray() + ) + ); + + return $result; + } + + private function totalRecords( ContentPackage $package ): int { + $total = 0; + + foreach ( $package->manifest() as $count ) { + $total += max( 0, (int) $count ); + } + + return $total; + } + + /** + * @return array> + */ + private function recordsForBucket( ContentPackage $package, string $bucket ): array { + $records = $package->records()[ $bucket ] ?? array(); + + if ( ! is_array( $records ) ) { + return array(); + } + + return array_values( + array_filter( + $records, + static function ( $record ): bool { + return is_array( $record ); + } + ) + ); + } +} diff --git a/tests/Unit/Sync/SyncEngineTest.php b/tests/Unit/Sync/SyncEngineTest.php new file mode 100644 index 0000000..995724a --- /dev/null +++ b/tests/Unit/Sync/SyncEngineTest.php @@ -0,0 +1,292 @@ +> */ + private \ArrayObject $call_recorder; + + /** @var \ArrayObject> */ + private \ArrayObject $log_recorder; + + protected function setUp(): void { + $this->call_recorder = new \ArrayObject(); + $this->log_recorder = new \ArrayObject(); + + parent::setUp(); + } + + protected function tearDown(): void { + unset( + $GLOBALS['wpcs_test_options'], + $GLOBALS['wpcs_test_option_autoloads'], + $GLOBALS['wpcs_test_transients'], + $GLOBALS['wpcs_test_transient_expiration'] + ); + + parent::tearDown(); + } + + public function test_it_calls_handlers_in_registry_order_and_merges_results(): void { + $engine = $this->engine( + array( + $this->handler( 'media', SyncResult::success( array( 'created' => 1 ) ) ), + $this->handler( 'posts', SyncResult::success( array( 'updated' => 2 ) ) ), + $this->handler( + 'terms', + SyncResult::success( + array( + 'skipped' => 1, + 'conflicts' => 1, + ) + ) + ), + ) + ); + + $result = $engine->importPackage( $this->package() ); + + self::assertTrue( $result->isSuccessful() ); + self::assertSame( 1, $result->created() ); + self::assertSame( 2, $result->updated() ); + self::assertSame( 1, $result->skipped() ); + self::assertSame( 1, $result->conflicts() ); + $calls = $this->call_recorder->getArrayCopy(); + + self::assertSame( array( 'terms', 'posts', 'media' ), array_column( $calls, 'bucket' ) ); + self::assertSame( array( array( 'id' => 20 ) ), $calls[0]['records'] ); + self::assertSame( array( array( 'id' => 10 ) ), $calls[1]['records'] ); + self::assertSame( array( array( 'id' => 30 ) ), $calls[2]['records'] ); + self::assertSame( 'manual_review', $calls[0]['context']->conflictStrategy() ); + } + + public function test_it_saves_running_and_completed_state(): void { + $engine = $this->engine( + array( + $this->handler( 'posts', SyncResult::success( array( 'created' => 1 ) ) ), + $this->handler( 'media', SyncResult::success( array( 'created' => 1 ) ) ), + ) + ); + + $engine->importPackage( $this->package() ); + + $states = array_values( $GLOBALS['wpcs_test_transients'] ); + + self::assertCount( 1, $GLOBALS['wpcs_test_transients'] ); + self::assertSame( 'completed', $states[0]['status'] ); + self::assertSame( 2, $states[0]['processed'] ); + self::assertSame( 3, $states[0]['total'] ); + } + + public function test_it_logs_operation_start_and_completion(): void { + $engine = $this->engine( + array( + $this->handler( 'posts', SyncResult::success( array( 'created' => 1 ) ) ), + ) + ); + + $engine->importPackage( $this->package() ); + + $logs = $this->log_recorder->getArrayCopy(); + + self::assertSame( 'Starting content package import.', $logs[0]['message'] ); + self::assertSame( 'Completed content package import.', $logs[1]['message'] ); + self::assertSame( 1, $logs[1]['context']['created'] ); + } + + public function test_it_returns_failure_when_handler_throws_import_exception(): void { + $engine = $this->engine( + array( + $this->throwingHandler( 'posts' ), + $this->handler( 'media', SyncResult::success( array( 'created' => 1 ) ) ), + ) + ); + + $result = $engine->importPackage( $this->package() ); + $states = array_values( $GLOBALS['wpcs_test_transients'] ); + $logs = $this->log_recorder->getArrayCopy(); + + self::assertFalse( $result->isSuccessful() ); + self::assertSame( array( 'Posts failed.' ), $result->errors() ); + self::assertSame( 'failed', $states[0]['status'] ); + self::assertSame( 'Content package import failed.', $logs[1]['message'] ); + self::assertSame( 'posts', $logs[1]['context']['bucket'] ); + } + + /** + * @param array $handlers Handlers. + */ + private function engine( array $handlers ): SyncEngine { + update_option( + SettingsRepository::OPTION_NAME, + array( + 'conflict_strategy' => 'manual_review', + ), + false + ); + + return new SyncEngine( + new ContentHandlerRegistry( $handlers ), + new SyncStateRepository(), + new SettingsRepository(), + $this->logger() + ); + } + + private function package(): ContentPackage { + return ContentPackage::fromArray( + array( + 'source' => array( + 'site_url' => 'https://source.test', + ), + 'destination' => array( + 'site_url' => 'https://destination.test', + ), + 'manifest' => array( + 'posts' => 1, + 'terms' => 1, + 'media' => 1, + ), + 'records' => array( + 'posts' => array( + array( 'id' => 10 ), + ), + 'terms' => array( + array( 'id' => 20 ), + ), + 'media' => array( + array( 'id' => 30 ), + ), + ), + ) + ); + } + + private function handler( string $bucket, SyncResult $result ): ContentHandlerInterface { + return new class( $bucket, $result, $this->call_recorder ) implements ContentHandlerInterface { + private string $bucket; + private SyncResult $result; + + /** @var \ArrayObject> */ + private \ArrayObject $recorder; + + /** + * @param \ArrayObject> $recorder Call recorder. + */ + public function __construct( string $bucket, SyncResult $result, \ArrayObject $recorder ) { + $this->bucket = $bucket; + $this->result = $result; + $this->recorder = $recorder; + } + + public function bucket(): string { + return $this->bucket; + } + + public function importRecords( array $records, SyncContext $context ): SyncResult { + $this->recorder->append( + array( + 'bucket' => $this->bucket, + 'records' => $records, + 'context' => $context, + ) + ); + + return $this->result; + } + }; + } + + private function throwingHandler( string $bucket ): ContentHandlerInterface { + return new class( $bucket, $this->call_recorder ) implements ContentHandlerInterface { + private string $bucket; + + /** @var \ArrayObject> */ + private \ArrayObject $recorder; + + /** + * @param \ArrayObject> $recorder Call recorder. + */ + public function __construct( string $bucket, \ArrayObject $recorder ) { + $this->bucket = $bucket; + $this->recorder = $recorder; + } + + public function bucket(): string { + return $this->bucket; + } + + public function importRecords( array $records, SyncContext $context ): SyncResult { + $this->recorder->append( + array( + 'bucket' => $this->bucket, + 'records' => $records, + 'context' => $context, + ) + ); + + throw new ContentImportException( $this->bucket, array( 'id' => 10 ), 'Posts failed.' ); + } + }; + } + + private function logger(): LoggerInterface { + return new class( $this->log_recorder ) implements LoggerInterface { + /** @var \ArrayObject> */ + private \ArrayObject $recorder; + + /** + * @param \ArrayObject> $recorder Log recorder. + */ + public function __construct( \ArrayObject $recorder ) { + $this->recorder = $recorder; + } + + public function error( string $message, array $context = array() ): void { + $this->record( 'error', $message, $context ); + } + + public function warning( string $message, array $context = array() ): void { + $this->record( 'warning', $message, $context ); + } + + public function info( string $message, array $context = array() ): void { + $this->record( 'info', $message, $context ); + } + + public function debug( string $message, array $context = array() ): void { + $this->record( 'debug', $message, $context ); + } + + /** + * @param array $context Context. + */ + private function record( string $level, string $message, array $context ): void { + $this->recorder->append( + array( + 'level' => $level, + 'message' => $message, + 'context' => $context, + ) + ); + } + }; + } +}