Files
WP-Site-Sync/includes/class-sync-engine.php
2025-12-14 21:57:25 -06:00

1612 lines
56 KiB
PHP

<?php
namespace SiteSync;
use WP_Error;
if ( ! defined( 'ABSPATH' ) ) {
exit;
}
/**
* Handles synchronization logic between WordPress sites, including posts, terms, media, and deletions.
*/
class Sync_Engine {
private const BATCH_SIZE = 20;
/**
* Settings instance.
*
* @var Settings
*/
private $settings;
/**
* Transport instance.
*
* @var Transport
*/
private $transport;
/**
* State instance.
*
* @var State
*/
private $state;
/**
* Sync_Engine constructor.
*
* @param Settings $settings Settings instance.
* @param Transport $transport Transport instance.
* @param State $state State instance.
*/
public function __construct( Settings $settings, Transport $transport, State $state ) {
$this->settings = $settings;
$this->transport = $transport;
$this->state = $state;
}
/**
* Registers WordPress hooks for sync operations.
*
* @return void
*/
public function hooks(): void {
add_action( 'site_sync/run_cycle', array( $this, 'run_cycle' ), 10, 1 );
add_action( 'site_sync/manual_trigger', array( $this, 'run_manual' ), 10, 1 );
add_action( 'site_sync/manual_push', array( $this, 'run_manual_push' ), 10, 1 );
add_action( 'site_sync/manual_pull', array( $this, 'run_manual_pull' ), 10, 1 );
add_action( 'trashed_post', array( $this, 'mark_post_tombstone' ) );
add_action( 'before_delete_post', array( $this, 'mark_post_tombstone' ) );
add_action( 'delete_term', array( $this, 'mark_term_tombstone' ), 10, 4 );
}
/**
* Cron-driven sync cycle.
*
* @param array $settings Settings array.
*/
public function run_cycle( array $settings ): void {
if ( empty( $settings['enabled'] ) ) {
return;
}
$this->run_once( 'cron' );
}
/**
* Manual sync trigger from admin.
*
* @param array $settings Settings array.
*/
public function run_manual( $settings = array() ): void { // phpcs:ignore
$this->run_once( 'manual' );
}
/**
* Manual push-only trigger from admin.
*
* @param array $settings Settings array.
*/
public function run_manual_push( $settings = array() ): void { // phpcs:ignore
$this->run_once( 'manual_push', true, false, false );
}
/**
* Manual pull-only trigger from admin.
*
* @param array $settings Settings array.
*/
public function run_manual_pull( $settings = array() ): void { // phpcs:ignore
$this->run_once( 'manual_pull', false, true, false );
}
/**
* Main sync loop.
*
* @param string $source The source of the sync trigger (e.g., 'cron' or 'manual').
* @param bool $doPush Whether to perform a push operation.
* @param bool $doPull Whether to perform a pull operation.
* @param bool $respectSettings Whether to respect the settings for push/pull enablement.
*/
public function run_once( string $source, bool $doPush = true, bool $doPull = true, bool $respectSettings = true ): void {
$settings = $this->settings->ensure_defaults();
$allowPush = $doPush;
$allowPull = $doPull;
$outCounts = array(
'posts' => 0,
'terms' => 0,
'media' => 0,
'tombstones' => 0,
);
$inCounts = array(
'applied' => 0,
'skipped' => 0,
'errors' => 0,
);
if ( $respectSettings ) {
$allowPush = $allowPush && ! empty( $settings['enable_push'] );
$allowPull = $allowPull && ! empty( $settings['enable_pull'] );
}
$mode = array(
'push' => $allowPush,
'pull' => $allowPull,
);
if ( ! $allowPush && ! $allowPull ) {
Log::info( 'Sync skipped; both directions disabled.', array( 'source' => $source ) );
$this->state->record_run(
array(
'error' => 'Sync directions disabled',
'counts' => array(
'sent' => $outCounts,
'received' => $inCounts,
),
)
);
return;
}
if ( empty( $settings['peer_url'] ) ) {
Log::warning( 'Sync skipped; peer URL not configured.', array( 'source' => $source ) );
$this->state->record_run( array( 'error' => 'Peer URL not configured' ) );
return;
}
$handshake = $this->transport->handshake();
if ( is_wp_error( $handshake ) ) {
$this->report_error( 'Handshake failed', $handshake, $source );
$this->state->record_run( array( 'error' => $handshake->get_error_message() ) );
return;
}
Log::info(
'Handshake succeeded',
array(
'source' => $source,
'peer' => $handshake['site_uuid'] ?? 'unknown',
)
);
if ( $allowPush ) {
$outbound = $this->provide_outbox( true );
$outCounts = $outbound['meta']['counts'] ?? $outCounts;
$payload = $outbound;
if ( isset( $payload['meta'] ) ) {
unset( $payload['meta'] );
}
$push = $this->transport->post( '/wp-json/site-sync/v1/inbox', $payload );
if ( is_wp_error( $push ) ) {
$this->report_error( 'Push to peer failed', $push, $source );
$this->state->record_run(
array(
'error' => $push->get_error_message(),
'counts' => array(
'sent' => $outCounts,
'received' => $inCounts,
),
)
);
return;
}
$this->commit_outbox( $outbound['cursor'], $outbound['meta']['counts'] ?? array() );
} else {
Log::info( 'Push skipped for this run.', array( 'source' => $source ) );
}
if ( $allowPull ) {
$pull = $this->transport->get( '/wp-json/site-sync/v1/outbox' );
if ( is_wp_error( $pull ) ) {
$this->report_error( 'Pull from peer failed', $pull, $source );
$this->state->record_run(
array(
'error' => $pull->get_error_message(),
'counts' => array(
'sent' => $outCounts,
'received' => $inCounts,
),
)
);
return;
}
if ( is_array( $pull ) && ! empty( $pull['items'] ) ) {
$result = $this->handle_inbox( $pull );
if ( is_array( $result ) ) {
$inCounts = array(
'applied' => (int) ( $result['applied'] ?? 0 ),
'skipped' => (int) ( $result['skipped'] ?? 0 ),
'errors' => is_array( $result['errors'] ?? null ) ? count( $result['errors'] ) : 0,
);
}
}
} else {
Log::info( 'Pull skipped for this run.', array( 'source' => $source ) );
}
$this->state->record_run(
array(
'error' => null,
'counts' => array(
'sent' => $outCounts,
'received' => $inCounts,
),
)
);
Log::info(
'Sync cycle completed',
array(
'source' => $source,
'mode' => $mode,
'sent' => $outCounts,
'received' => $inCounts,
)
);
}
/**
* Handle incoming inbox payload from peer.
*
* @param array $payload Incoming payload from peer.
* @return array Result of inbox handling.
*/
public function handle_inbox( array $payload ): array {
$items = $payload['items'] ?? $payload;
$applied = 0;
$skipped = 0;
$errors = array();
$lastPostCursor = array(
'modified' => null,
'id' => 0,
);
$lastTermCursor = array( 'id' => 0 );
$lastMediaCursor = array(
'modified' => null,
'id' => 0,
);
if ( ! is_array( $items ) ) {
return array(
'status' => 'error',
'message' => __( 'Invalid payload.', 'site-sync' ),
);
}
foreach ( $items as $item ) {
if ( ! is_array( $item ) || empty( $item['entity'] ) ) {
++$skipped;
continue;
}
$entity = $item['entity'];
if ( $entity === 'post' ) {
$result = $this->apply_post( $item );
if ( is_wp_error( $result ) ) {
$errors[] = $result->get_error_message();
Log::error(
'Inbox post apply failed',
array(
'code' => $result->get_error_code(),
'message' => $result->get_error_message(),
'external_id' => $item['external_id'] ?? '',
'id' => $item['id'] ?? '',
'data' => $result->get_error_data(),
)
);
continue;
}
++$applied;
$lastPostCursor = array(
'modified' => $item['modified_gmt'] ?? null,
'id' => (int) ( $item['id'] ?? 0 ),
);
continue;
}
if ( $entity === 'term' ) {
$result = $this->apply_term( $item );
if ( is_wp_error( $result ) ) {
$errors[] = $result->get_error_message();
Log::error(
'Inbox term apply failed',
array(
'code' => $result->get_error_code(),
'message' => $result->get_error_message(),
'external_id' => $item['external_id'] ?? '',
'taxonomy' => $item['taxonomy'] ?? '',
'id' => $item['id'] ?? '',
'data' => $result->get_error_data(),
)
);
continue;
}
++$applied;
$lastTermCursor = array(
'id' => (int) ( $item['id'] ?? 0 ),
);
continue;
}
if ( $entity === 'media' ) {
$result = $this->apply_media( $item );
if ( is_wp_error( $result ) ) {
$errors[] = $result->get_error_message();
Log::error(
'Inbox media apply failed',
array(
'code' => $result->get_error_code(),
'message' => $result->get_error_message(),
'external_id' => $item['external_id'] ?? '',
'id' => $item['id'] ?? '',
'data' => $result->get_error_data(),
)
);
continue;
}
++$applied;
$lastMediaCursor = array(
'modified' => $item['modified_gmt'] ?? null,
'id' => (int) ( $item['id'] ?? 0 ),
);
continue;
}
if ( $entity === 'delete' ) {
$result = $this->apply_delete( $item );
if ( is_wp_error( $result ) ) {
$errors[] = $result->get_error_message();
Log::error(
'Inbox delete apply failed',
array(
'code' => $result->get_error_code(),
'message' => $result->get_error_message(),
'external_id' => $item['external_id'] ?? '',
'type' => $item['type'] ?? '',
'data' => $result->get_error_data(),
)
);
continue;
}
++$applied;
continue;
}
++$skipped;
}
// Update last_received checkpoint if progress was made.
$state = $this->state->get();
if ( $applied > 0 ) {
if ( ! empty( $lastPostCursor['modified'] ) ) {
$state['last_received']['posts'] = $lastPostCursor;
}
if ( ! empty( $lastTermCursor['id'] ) ) {
$state['last_received']['terms'] = $lastTermCursor;
}
if ( ! empty( $lastMediaCursor['modified'] ) ) {
$state['last_received']['media'] = $lastMediaCursor;
}
$this->state->update( $state );
}
return array(
'status' => empty( $errors ) ? 'ok' : 'partial',
'applied' => $applied,
'skipped' => $skipped,
'errors' => $errors,
);
}
/**
* Provide outbox payload (deltas) to peer.
*
* @param bool $withMeta Whether to include meta information in the payload.
* @return array Outbox payload.
*/
public function provide_outbox( bool $withMeta = false ): array {
$posts = $this->build_post_outbox();
$terms = $this->build_term_outbox();
$media = $this->build_media_outbox();
$deletes = $this->build_tombstones();
$payload = array(
'items' => array_merge( $terms['items'], $posts['items'], $media['items'], $deletes ),
'cursor' => array(
'posts' => $posts['cursor'],
'terms' => $terms['cursor'],
'media' => $media['cursor'],
),
'status' => 'ok',
);
if ( $withMeta ) {
$payload['meta'] = array(
'counts' => array(
'posts' => count( $posts['items'] ),
'terms' => count( $terms['items'] ),
'media' => count( $media['items'] ),
'tombstones' => count( $deletes ),
),
);
}
return $payload;
}
/**
* Commit checkpoints and tombstone consumption after a successful outbound push.
*
* @param array $cursor Checkpoint cursor for posts, terms, and media.
* @param array $counts Counts of items sent in each category.
*/
public function commit_outbox( array $cursor, array $counts ): void {
$state = $this->state->get();
$changed = false;
if ( ! empty( $counts['posts'] ) && ! empty( $cursor['posts'] ) ) {
$state['last_sent']['posts'] = $cursor['posts'];
$changed = true;
}
if ( ! empty( $counts['terms'] ) && ! empty( $cursor['terms'] ) ) {
$state['last_sent']['terms'] = $cursor['terms'];
$changed = true;
}
if ( ! empty( $counts['media'] ) && ! empty( $cursor['media'] ) ) {
$state['last_sent']['media'] = $cursor['media'];
$changed = true;
}
if ( $changed ) {
$this->state->update( $state );
}
if ( ! empty( $counts['tombstones'] ) ) {
$this->state->consume_tombstones( (int) $counts['tombstones'] );
}
}
/**
* Handshake-only test from admin.
*/
public function test_handshake() {
$settings = $this->settings->ensure_defaults();
if ( empty( $settings['peer_url'] ) ) {
return new WP_Error( 'site_sync_missing_peer', __( 'Peer URL not configured.', 'site-sync' ) );
}
return $this->transport->handshake();
}
/**
* Accept incoming media stream (authenticated REST upload).
*
* @param \WP_REST_Request $request The REST request object containing the media stream.
*/
public function receive_media_stream( \WP_REST_Request $request ) {
$externalId = $request->get_param( 'external_id' );
$mime = $request->get_param( 'mime_type' );
$filename = $request->get_param( 'filename' ) ? $request->get_param( 'filename' ) : ( 'media-' . wp_generate_password( 6, false ) );
$body = $request->get_body();
$size = strlen( $body );
$headerChecksum = $request->get_header( 'X-Site-Sync-Checksum' );
if ( $size > 10 * 1024 * 1024 ) {
return new WP_Error( 'site_sync_media_too_large', __( 'Media payload too large.', 'site-sync' ), array( 'status' => 413 ) );
}
$allowed = get_allowed_mime_types();
if ( $mime && ! in_array( $mime, $allowed, true ) ) {
return new WP_Error( 'site_sync_media_mime', __( 'MIME type not allowed.', 'site-sync' ), array( 'status' => 400 ) );
}
if ( ! $externalId || ! $mime || ! $body ) {
return new WP_Error( 'site_sync_bad_media', __( 'Invalid media upload payload.', 'site-sync' ), array( 'status' => 400 ) );
}
$computed = hash( 'sha256', $body );
if ( $headerChecksum && ! hash_equals( $headerChecksum, $computed ) ) {
return new WP_Error( 'site_sync_media_checksum', __( 'Checksum mismatch.', 'site-sync' ), array( 'status' => 400 ) );
}
require_once ABSPATH . 'wp-admin/includes/file.php';
require_once ABSPATH . 'wp-admin/includes/media.php';
$tmp = \wp_tempnam( $filename );
if ( ! $tmp ) {
return new WP_Error( 'site_sync_media_tmp_fail', __( 'Could not create temp file.', 'site-sync' ), array( 'status' => 500 ) );
}
global $wp_filesystem;
if ( empty( $wp_filesystem ) ) {
require_once ABSPATH . '/wp-admin/includes/file.php';
WP_Filesystem();
}
$wp_filesystem->put_contents( $tmp, $body, FS_CHMOD_FILE );
require_once ABSPATH . 'wp-admin/includes/file.php';
require_once ABSPATH . 'wp-admin/includes/image.php';
require_once ABSPATH . 'wp-admin/includes/media.php';
$safeName = $this->normalize_media_filename( $filename, $mime );
$file = array(
'name' => $safeName,
'tmp_name' => $tmp,
'type' => $mime,
);
$filters = $this->temporarily_allow_mime( $mime, pathinfo( $safeName, PATHINFO_EXTENSION ) );
$attachmentId = \media_handle_sideload(
$file,
0,
'',
array(
'post_mime_type' => $mime,
'post_status' => 'inherit',
)
);
$this->remove_temporary_mime_filters( $filters );
if ( is_wp_error( $attachmentId ) ) {
wp_delete_file( $tmp );
return $attachmentId;
}
update_post_meta( $attachmentId, SITE_SYNC_META_MEDIA_EXTERNAL_ID, $externalId );
wp_delete_file( $tmp );
return $attachmentId;
}
/**
* Builds the outbox payload for posts to be synchronized.
*
* @return array Outbox data for posts including items and cursor.
*/
private function build_post_outbox(): array {
$state = $this->state->get();
$checkpoint = $state['last_sent']['posts'] ?? array(
'modified' => null,
'id' => 0,
);
$postTypes = $this->get_supported_post_types();
$args = array(
'post_type' => $postTypes,
'post_status' => array( 'publish', 'pending', 'draft', 'future', 'private' ),
'posts_per_page' => self::BATCH_SIZE,
'orderby' => array(
'modified' => 'ASC',
'ID' => 'ASC',
),
'fields' => 'all',
'no_found_rows' => true,
);
$filter = null;
if ( ! empty( $checkpoint['modified'] ) ) {
$modified = $checkpoint['modified'];
$lastId = (int) ( $checkpoint['id'] ?? 0 );
$filter = function ( $where ) use ( $modified, $lastId ) {
global $wpdb;
$where .= $wpdb->prepare(
" AND ( ( $wpdb->posts.post_modified_gmt > %s ) OR ( $wpdb->posts.post_modified_gmt = %s AND $wpdb->posts.ID > %d ) )",
$modified,
$modified,
$lastId
);
return $where;
};
add_filter( 'posts_where', $filter );
}
$query = new \WP_Query( $args );
if ( $filter ) {
remove_filter( 'posts_where', $filter );
}
$items = array();
$lastCursor = $checkpoint;
$settings = $this->settings->ensure_defaults();
foreach ( $query->posts as $post ) {
$extId = $this->ensure_post_external_id( $post->ID );
$items[] = array(
'entity' => 'post',
'external_id' => $extId,
'site_uuid' => $settings['site_uuid'],
'id' => (int) $post->ID,
'modified_gmt' => $post->post_modified_gmt,
'data' => $this->serialize_post( $post ),
);
$lastCursor = array(
'modified' => $post->post_modified_gmt,
'id' => (int) $post->ID,
);
}
// Update checkpoint if we sent items.
return array(
'items' => $items,
'cursor' => $lastCursor,
);
}
/**
* Builds the outbox payload for terms to be synchronized.
*
* @return array Outbox data for terms including items and cursor.
*/
private function build_term_outbox(): array {
global $wpdb;
$state = $this->state->get();
$checkpoint = $state['last_sent']['terms'] ?? array( 'id' => 0 );
$lastId = (int) ( $checkpoint['id'] ?? 0 );
$taxonomies = $this->get_supported_taxonomies();
if ( empty( $taxonomies ) ) {
return array(
'items' => array(),
'cursor' => $checkpoint,
);
}
$args = array(
'taxonomy' => $taxonomies,
'hide_empty' => false,
'orderby' => 'term_id',
'order' => 'ASC',
'number' => self::BATCH_SIZE,
'fields' => 'all',
'offset' => 0,
);
// Custom filter to only get terms with term_id > $lastId
$terms = get_terms( $args );
$filtered_terms = array();
foreach ( $terms as $term ) {
if ( $term->term_id > $lastId ) {
$filtered_terms[] = $term;
if ( count( $filtered_terms ) >= self::BATCH_SIZE ) {
break;
}
}
}
$items = array();
$lastCursor = $checkpoint;
$settings = $this->settings->ensure_defaults();
foreach ( $filtered_terms as $term ) {
$termId = (int) $term->term_id;
$extId = $this->ensure_term_external_id( $termId );
$parentExt = $term->parent ? $this->ensure_term_external_id( (int) $term->parent ) : null;
$parentSlug = $term->parent ? get_term_field( 'slug', (int) $term->parent, $term->taxonomy ) : null;
$items[] = array(
'entity' => 'term',
'id' => $termId,
'external_id' => $extId,
'taxonomy' => $term->taxonomy,
'slug' => $term->slug,
'name' => $term->name,
'description' => $term->description,
'parent_external_id' => $parentExt,
'parent_slug' => $parentSlug,
'site_uuid' => $settings['site_uuid'],
);
$lastCursor = array( 'id' => $termId );
}
return array(
'items' => $items,
'cursor' => $lastCursor,
);
}
/**
* Builds the outbox payload for media attachments to be synchronized.
*
* @return array Outbox data for media including items and cursor.
*/
private function build_media_outbox(): array {
$state = $this->state->get();
$checkpoint = $state['last_sent']['media'] ?? array(
'modified' => null,
'id' => 0,
);
$args = array(
'post_type' => 'attachment',
'post_status' => 'inherit',
'posts_per_page' => self::BATCH_SIZE,
'orderby' => array(
'modified' => 'ASC',
'ID' => 'ASC',
),
'fields' => 'all',
'no_found_rows' => true,
);
$filter = null;
if ( ! empty( $checkpoint['modified'] ) ) {
$modified = $checkpoint['modified'];
$lastId = (int) ( $checkpoint['id'] ?? 0 );
$filter = function ( $where ) use ( $modified, $lastId ) {
global $wpdb;
$where .= $wpdb->prepare(
" AND ( ( $wpdb->posts.post_modified_gmt > %s ) OR ( $wpdb->posts.post_modified_gmt = %s AND $wpdb->posts.ID > %d ) )",
$modified,
$modified,
$lastId
);
return $where;
};
add_filter( 'posts_where', $filter );
}
$query = new \WP_Query( $args );
if ( $filter ) {
remove_filter( 'posts_where', $filter );
}
$items = array();
$lastCursor = $checkpoint;
$settings = $this->settings->ensure_defaults();
foreach ( $query->posts as $attachment ) {
$extId = $this->ensure_media_external_id( $attachment->ID );
$meta = wp_get_attachment_metadata( $attachment->ID );
$path = get_attached_file( $attachment->ID );
$checksum = ( $path && file_exists( $path ) ) ? hash_file( 'sha256', $path ) : null;
$items[] = array(
'entity' => 'media',
'external_id' => $extId,
'site_uuid' => $settings['site_uuid'],
'id' => (int) $attachment->ID,
'modified_gmt' => $attachment->post_modified_gmt,
'data' => array(
'post_title' => $attachment->post_title,
'post_name' => $attachment->post_name,
'mime_type' => $attachment->post_mime_type,
'guid' => $attachment->guid,
'filename' => basename( get_attached_file( $attachment->ID ) ),
'checksum' => $checksum,
'meta' => $meta,
),
);
$lastCursor = array(
'modified' => $attachment->post_modified_gmt,
'id' => (int) $attachment->ID,
);
}
return array(
'items' => $items,
'cursor' => $lastCursor,
);
}
/**
* Builds the outbox payload for tombstone (delete) entities to be synchronized.
*
* @return array List of tombstone items.
*/
private function build_tombstones(): array {
return $this->state->peek_tombstones( self::BATCH_SIZE );
}
/**
* Serializes a WP_Post object into an array for synchronization.
*
* @param \WP_Post $post The post object to serialize.
* @return array The serialized post data.
*/
private function serialize_post( \WP_Post $post ): array {
return array(
'post_type' => $post->post_type,
'post_status' => $post->post_status,
'post_title' => $post->post_title,
'post_content' => $post->post_content,
'post_excerpt' => $post->post_excerpt,
'post_name' => $post->post_name,
'post_parent' => (int) $post->post_parent,
'post_date_gmt' => $post->post_date_gmt,
'post_modified_gmt' => $post->post_modified_gmt,
'terms' => $this->get_post_terms( $post->ID ),
'meta' => $this->get_whitelisted_meta( $post->ID ),
);
}
/**
* Retrieves the terms associated with a post, grouped by taxonomy.
*
* @param int $postId The ID of the post.
* @return array Array of term slugs grouped by taxonomy.
*/
private function get_post_terms( int $postId ): array {
$taxonomies = get_object_taxonomies( get_post_type( $postId ) );
$terms = array();
foreach ( $taxonomies as $tax ) {
$termObjs = wp_get_object_terms( $postId, $tax, array( 'fields' => 'slugs' ) );
if ( ! is_wp_error( $termObjs ) && ! empty( $termObjs ) ) {
$terms[ $tax ] = $termObjs;
}
}
return $terms;
}
/**
* Retrieves whitelisted meta fields for a given post.
*
* @param int $postId The ID of the post.
* @return array Array of whitelisted meta key-value pairs.
*/
private function get_whitelisted_meta( int $postId ): array {
$keys = $this->settings->get_post_meta_keys();
$keys = apply_filters( 'site_sync/post_meta_keys', $keys );
$meta = array();
foreach ( $keys as $key ) {
$value = get_post_meta( $postId, $key, true );
if ( $value !== '' ) {
$meta[ $key ] = maybe_unserialize( $value );
}
}
return $meta;
}
/**
* Ensures a post has an external ID for synchronization, generating and storing one if missing.
*
* @param int $postId The ID of the post.
* @return string The external ID for the post.
*/
private function ensure_post_external_id( int $postId ): string {
$existing = get_post_meta( $postId, SITE_SYNC_META_EXTERNAL_ID, true );
if ( $existing ) {
return (string) $existing;
}
$uuid = wp_generate_uuid4();
update_post_meta( $postId, SITE_SYNC_META_EXTERNAL_ID, $uuid );
return $uuid;
}
/**
* Applies a post item from the sync payload to the local site.
*
* @param array $item The post item data from the sync payload.
* @return true|WP_Error True on success, WP_Error on failure.
*/
private function apply_post( array $item ) {
$externalId = $item['external_id'] ?? '';
$data = $item['data'] ?? array();
$modifiedIncoming = $item['modified_gmt'] ?? '';
if ( ! $externalId || empty( $data['post_type'] ) ) {
return new WP_Error( 'site_sync_invalid_item', __( 'Missing external ID or post type.', 'site-sync' ) );
}
$existingId = $this->find_post_by_external_id( $externalId );
if ( ! $existingId && ! empty( $data['post_name'] ) ) {
$bySlug = get_page_by_path( $data['post_name'], OBJECT, array( $data['post_type'] ) );
if ( $bySlug && ! is_wp_error( $bySlug ) ) {
$existingId = (int) $bySlug->ID;
}
}
if ( $existingId ) {
$current = get_post( $existingId );
if ( $current ) {
$currentModified = $current->post_modified_gmt;
// Conflict resolution: last modified wins.
if ( $currentModified && $modifiedIncoming && $currentModified > $modifiedIncoming ) {
return true; // local newer, skip
}
}
}
$postArr = array(
'ID' => $existingId ? $existingId : 0,
'post_type' => $data['post_type'],
'post_status' => $data['post_status'] ?? 'draft',
'post_title' => $data['post_title'] ?? '',
'post_content' => $data['post_content'] ?? '',
'post_excerpt' => $data['post_excerpt'] ?? '',
'post_name' => $data['post_name'] ?? '',
'post_parent' => isset( $data['post_parent'] ) ? (int) $data['post_parent'] : 0,
'post_date_gmt' => $data['post_date_gmt'] ?? null,
'post_modified_gmt' => $modifiedIncoming ? $modifiedIncoming : current_time( 'mysql', true ),
);
$postId = $existingId ? wp_update_post( wp_slash( $postArr ), true ) : wp_insert_post( wp_slash( $postArr ), true );
if ( is_wp_error( $postId ) ) {
return $postId;
}
// Ensure external ID mapping is stored.
update_post_meta( $postId, SITE_SYNC_META_EXTERNAL_ID, $externalId );
// Taxonomy terms.
if ( ! empty( $data['terms'] ) && is_array( $data['terms'] ) ) {
foreach ( $data['terms'] as $tax => $slugs ) {
if ( ! taxonomy_exists( $tax ) ) {
continue;
}
wp_set_object_terms( $postId, $slugs, $tax, false );
}
}
// Whitelisted meta merge: override keys we manage, leave others untouched.
if ( ! empty( $data['meta'] ) && is_array( $data['meta'] ) ) {
foreach ( $data['meta'] as $key => $value ) {
update_post_meta( $postId, $key, $value );
}
}
return true;
}
/**
* Finds a post ID by its external ID meta value.
*
* @param string $externalId The external ID to search for.
* @return int The post ID if found, or 0 if not found.
*/
private function find_post_by_external_id( string $externalId ) {
$posts = get_posts(
array(
'post_type' => 'any',
'meta_key' => SITE_SYNC_META_EXTERNAL_ID,
'meta_value' => $externalId, // phpcs:ignore
'posts_per_page' => 1,
'fields' => 'ids',
'post_status' => array( 'publish', 'pending', 'draft', 'future', 'private', 'trash' ),
)
);
return $posts ? (int) $posts[0] : 0;
}
/**
* Retrieves the supported post types for synchronization.
*
* @return array List of supported post type names.
*/
private function get_supported_post_types(): array {
$types = get_post_types( array( 'show_ui' => true ), 'names' );
unset( $types['attachment'], $types['revision'], $types['nav_menu_item'] );
return array_values( $types );
}
/**
* Ensures a term has an external ID for synchronization, generating and storing one if missing.
*
* @param int $termId The ID of the term.
* @return string The external ID for the term.
*/
private function ensure_term_external_id( int $termId ): string {
$existing = get_term_meta( $termId, SITE_SYNC_META_TERM_EXTERNAL_ID, true );
if ( $existing ) {
return (string) $existing;
}
$uuid = wp_generate_uuid4();
update_term_meta( $termId, SITE_SYNC_META_TERM_EXTERNAL_ID, $uuid );
return $uuid;
}
/**
* Finds a term ID by its external ID meta value within a specific taxonomy.
*
* @param string $externalId The external ID to search for.
* @param string $taxonomy The taxonomy to search within.
* @return int The term ID if found, or 0 if not found.
*/
private function find_term_by_external_id( string $externalId, string $taxonomy ) {
$terms = get_terms(
array(
'taxonomy' => $taxonomy,
'hide_empty' => false,
'meta_query' => array(
array(
'key' => SITE_SYNC_META_TERM_EXTERNAL_ID,
'value' => $externalId,
),
),
'fields' => 'ids',
'number' => 1,
)
);
if ( is_wp_error( $terms ) || empty( $terms ) ) {
return 0;
}
return (int) $terms[0];
}
/**
* Applies a term item from the sync payload to the local site.
*
* @param array $item The term item data from the sync payload.
* @return true|WP_Error True on success, WP_Error on failure.
*/
private function apply_term( array $item ) {
$externalId = $item['external_id'] ?? '';
$taxonomy = $item['taxonomy'] ?? '';
$slug = $item['slug'] ?? '';
$name = $item['name'] ?? $slug;
if ( ! $externalId || ! $taxonomy || ! $slug ) {
return new WP_Error( 'site_sync_invalid_term', __( 'Missing term data.', 'site-sync' ) );
}
if ( ! taxonomy_exists( $taxonomy ) ) {
// translators: %s: taxonomy name.
return new WP_Error( 'site_sync_unknown_taxonomy', sprintf( __( 'Taxonomy %s does not exist.', 'site-sync' ), $taxonomy ) );
}
$existingId = $this->find_term_by_external_id( $externalId, $taxonomy );
if ( ! $existingId ) {
$bySlug = get_term_by( 'slug', $slug, $taxonomy );
$existingId = $bySlug ? (int) $bySlug->term_id : 0;
}
$parentId = 0;
if ( ! empty( $item['parent_external_id'] ) ) {
$parentId = $this->find_term_by_external_id( (string) $item['parent_external_id'], $taxonomy );
}
if ( ! $parentId && ! empty( $item['parent_slug'] ) ) {
$parent = get_term_by( 'slug', $item['parent_slug'], $taxonomy );
if ( $parent && ! is_wp_error( $parent ) ) {
$parentId = (int) $parent->term_id;
}
}
$args = array(
'slug' => $slug,
'description' => $item['description'] ?? '',
'parent' => $parentId,
);
if ( $existingId ) {
$result = wp_update_term( $existingId, $taxonomy, $args );
$termId = is_wp_error( $result ) ? $result : (int) $result['term_id'];
} else {
$result = wp_insert_term( $name, $taxonomy, $args );
$termId = is_wp_error( $result ) ? $result : (int) $result['term_id'];
}
if ( is_wp_error( $termId ) ) {
return $termId;
}
update_term_meta( $termId, SITE_SYNC_META_TERM_EXTERNAL_ID, $externalId );
return true;
}
/**
* Retrieves the supported taxonomies for synchronization.
*
* @return array List of supported taxonomy names.
*/
private function get_supported_taxonomies(): array {
$tax = get_taxonomies( array( 'public' => true ), 'names' );
return array_values( $tax );
}
/**
* Ensures a media attachment has an external ID for synchronization, generating and storing one if missing.
*
* @param int $attachmentId The ID of the media attachment.
* @return string The external ID for the media attachment.
*/
private function ensure_media_external_id( int $attachmentId ): string {
$existing = get_post_meta( $attachmentId, SITE_SYNC_META_MEDIA_EXTERNAL_ID, true );
if ( $existing ) {
return (string) $existing;
}
$uuid = wp_generate_uuid4();
update_post_meta( $attachmentId, SITE_SYNC_META_MEDIA_EXTERNAL_ID, $uuid );
return $uuid;
}
/**
* Finds a media attachment ID by its external ID meta value.
*
* @param string $externalId The external ID to search for.
* @return int The attachment ID if found, or 0 if not found.
*/
private function find_media_by_external_id( string $externalId ) {
$posts = get_posts(
array(
'post_type' => 'attachment',
'meta_key' => SITE_SYNC_META_MEDIA_EXTERNAL_ID,
'meta_value' => $externalId, // phpcs:ignore
'posts_per_page' => 1,
'fields' => 'ids',
'post_status' => 'inherit',
)
);
return $posts ? (int) $posts[0] : 0;
}
/**
* Marks a post or media item for deletion by enqueuing a tombstone.
*
* @param int $postId The ID of the post or media to mark as deleted.
* @return void
*/
public function mark_post_tombstone( int $postId ): void {
$post = get_post( $postId );
if ( ! $post ) {
return;
}
$type = $post->post_type === 'attachment' ? 'media' : 'post';
$externalId = $type === 'media'
? $this->ensure_media_external_id( $postId )
: $this->ensure_post_external_id( $postId );
// Skip enqueue if this is already a remote tombstone application.
if ( did_action( 'site_sync/applying_delete' ) ) {
return;
}
$this->state->enqueue_tombstone(
array(
'entity' => 'delete',
'type' => $type,
'external_id' => $externalId,
'id' => $postId,
)
);
}
/**
* Marks a term for deletion by enqueuing a tombstone.
*
* @param int|\WP_Term $term The term object or ID to mark as deleted.
* @param int $tt_id Term taxonomy ID (unused).
* @param string $taxonomy The taxonomy of the term.
*
* @return void
*/
public function mark_term_tombstone( $term, $tt_id, $taxonomy ): void {
if ( ! $taxonomy || ! $term ) {
return;
}
$termId = is_object( $term ) ? (int) $term->term_id : (int) $term;
$externalId = $this->ensure_term_external_id( $termId );
if ( did_action( 'site_sync/applying_delete' ) ) {
return;
}
$this->state->enqueue_tombstone(
array(
'entity' => 'delete',
'type' => 'term',
'taxonomy' => $taxonomy,
'external_id' => $externalId,
'id' => $termId,
)
);
}
/**
* Retrieves the media file and its metadata by external ID.
*
* @param string $externalId The external ID of the media attachment.
* @return array|WP_Error Array with file data or WP_Error on failure.
*/
public function get_media_file( string $externalId ) {
$id = $this->find_media_by_external_id( $externalId );
if ( ! $id ) {
return new WP_Error( 'site_sync_media_not_found', __( 'Media not found for external ID.', 'site-sync' ), array( 'status' => 404 ) );
}
$path = get_attached_file( $id );
if ( ! $path || ! file_exists( $path ) ) {
return new WP_Error( 'site_sync_media_missing_file', __( 'Media file not found on disk.', 'site-sync' ), array( 'status' => 404 ) );
}
$mime = get_post_mime_type( $id ) ? get_post_mime_type( $id ) : 'application/octet-stream';
$filename = basename( $path );
$checksum = file_exists( $path ) ? hash_file( 'sha256', $path ) : null;
global $wp_filesystem;
if ( empty( $wp_filesystem ) ) {
require_once ABSPATH . '/wp-admin/includes/file.php';
WP_Filesystem();
}
$contents = $wp_filesystem->get_contents( $path );
return array(
'path' => $path,
'mime' => $mime,
'filename' => $filename,
'contents' => $contents,
'checksum' => $checksum,
);
}
/**
* Applies a media item from the sync payload to the local site.
*
* @param array $item The media item data from the sync payload.
* @return true|WP_Error True on success, WP_Error on failure.
*/
private function apply_media( array $item ) {
$externalId = $item['external_id'] ?? '';
$data = $item['data'] ?? array();
$modifiedIncoming = $item['modified_gmt'] ?? '';
if ( ! $externalId || empty( $data['mime_type'] ) ) {
return new WP_Error( 'site_sync_invalid_media', __( 'Missing media data.', 'site-sync' ) );
}
$existingId = $this->find_media_by_external_id( $externalId );
if ( $existingId ) {
$current = get_post( $existingId );
if ( $current ) {
$currentModified = $current->post_modified_gmt;
if ( $currentModified && $modifiedIncoming && $currentModified > $modifiedIncoming ) {
return true; // local newer
}
}
}
// Try authenticated fetch from peer media endpoint; fallback to GUID fetch.
$tmp = null;
$file = null;
require_once ABSPATH . 'wp-admin/includes/file.php';
$fetched = $this->transport->fetch_media_by_external_id( $externalId );
if ( ! is_wp_error( $fetched ) && ! empty( $fetched['body'] ) ) {
$tmp = \wp_tempnam( $externalId );
if ( $tmp ) {
global $wp_filesystem;
if ( empty( $wp_filesystem ) ) {
require_once ABSPATH . '/wp-admin/includes/file.php';
WP_Filesystem();
}
$written = $wp_filesystem->put_contents( $tmp, $fetched['body'], FS_CHMOD_FILE );
if ( $written ) {
$mime = $data['mime_type'] ?? '';
$filename = $data['filename'] ?? $data['post_name'] ?? ( 'media-' . substr( $externalId, 0, 8 ) );
$ext = $this->determine_extension( $mime, $filename );
$safeName = $this->normalize_media_filename( $filename, $mime, $ext );
$file = array(
'name' => $safeName,
'tmp_name' => $tmp,
'type' => $mime,
'checksum' => $fetched['checksum'] ?? null,
);
}
}
} else {
$guid = $data['guid'] ?? '';
if ( $guid && wp_http_validate_url( $guid ) ) {
$response = wp_remote_get( $guid, array( 'timeout' => 30 ) );
if ( ! is_wp_error( $response ) && wp_remote_retrieve_response_code( $response ) === 200 ) {
$body = wp_remote_retrieve_body( $response );
if ( ! empty( $body ) ) {
$tmp = \wp_tempnam( $guid );
if ( $tmp ) {
global $wp_filesystem;
if ( empty( $wp_filesystem ) ) {
require_once ABSPATH . '/wp-admin/includes/file.php';
WP_Filesystem();
}
$written = $wp_filesystem->put_contents( $tmp, $body, FS_CHMOD_FILE );
if ( $written ) {
$mime = $data['mime_type'] ?? '';
$ext = $this->determine_extension( $mime, $guid );
$url_parts = wp_parse_url( $guid );
$path = isset( $url_parts['path'] ) ? $url_parts['path'] : 'sync-media';
$safeName = $this->normalize_media_filename( basename( $path ), $mime, $ext );
$file = array(
'name' => $safeName,
'tmp_name' => $tmp,
'type' => $data['mime_type'] ?? '',
);
}
}
}
}
}
}
if ( $file && isset( $file['checksum'] ) ) {
$downloadChecksum = hash_file( 'sha256', $file['tmp_name'] );
if ( $downloadChecksum && $file['checksum'] && ! hash_equals( $file['checksum'], $downloadChecksum ) ) {
wp_delete_file( $file['tmp_name'] );
return new WP_Error( 'site_sync_media_checksum', __( 'Media checksum mismatch.', 'site-sync' ) );
}
}
if ( ! $file ) {
// Create a placeholder attachment post without file.
$attachment = array(
'ID' => $existingId ? $existingId : 0,
'post_title' => $data['post_title'] ?? ( $data['post_name'] ?? 'media' ),
'post_name' => $data['post_name'] ?? '',
'post_mime_type' => $data['mime_type'],
'post_status' => 'inherit',
'post_modified_gmt' => $modifiedIncoming ? $modifiedIncoming : current_time( 'mysql', true ),
'post_date_gmt' => $data['post_date_gmt'] ?? null,
);
$attachmentId = $existingId
? wp_update_post( wp_slash( $attachment ), true )
: wp_insert_post( wp_slash( $attachment ), true );
} else {
require_once ABSPATH . 'wp-admin/includes/image.php';
require_once ABSPATH . 'wp-admin/includes/media.php';
$mime = $data['mime_type'] ?? '';
$ext = $this->determine_extension( $mime, $file['name'] ?? '' );
$filters = $this->temporarily_allow_mime( $mime, $ext );
$attachmentId = \media_handle_sideload(
$file,
0,
$data['post_title'] ?? '',
array(
'post_name' => $data['post_name'] ?? '',
'post_mime_type' => $mime,
'post_modified_gmt' => $modifiedIncoming ? $modifiedIncoming : current_time( 'mysql', true ),
'post_date_gmt' => $data['post_date_gmt'] ?? null,
)
);
$this->remove_temporary_mime_filters( $filters );
}
if ( is_wp_error( $attachmentId ) ) {
Log::error(
'Media sideload failed',
array(
'code' => $attachmentId->get_error_code(),
'message' => $attachmentId->get_error_message(),
'external_id' => $externalId,
'mime' => $data['mime_type'] ?? '',
'filename' => $file['name'] ?? '',
'data' => $attachmentId->get_error_data(),
)
);
return $attachmentId;
}
update_post_meta( $attachmentId, SITE_SYNC_META_MEDIA_EXTERNAL_ID, $externalId );
if ( $tmp && file_exists( $tmp ) ) {
wp_delete_file( $tmp );
}
return true;
}
/**
* Applies a delete item from the sync payload to the local site.
*
* @param array $item The delete item data from the sync payload.
* @return true|WP_Error True on success, WP_Error on failure.
*/
private function apply_delete( array $item ) {
$type = $item['type'] ?? '';
$externalId = $item['external_id'] ?? '';
$taxonomy = $item['taxonomy'] ?? '';
if ( ! $type || ! $externalId ) {
return new WP_Error( 'site_sync_invalid_delete', __( 'Invalid delete payload.', 'site-sync' ) );
}
if ( $type === 'post' ) {
$id = $this->find_post_by_external_id( $externalId );
if ( $id ) {
do_action( 'site_sync/applying_delete', $type, $id );
wp_trash_post( $id );
}
return true;
}
if ( $type === 'media' ) {
$id = $this->find_media_by_external_id( $externalId );
if ( $id ) {
do_action( 'site_sync/applying_delete', $type, $id );
wp_delete_attachment( $id, true );
}
return true;
}
if ( $type === 'term' ) {
if ( ! $taxonomy ) {
return new WP_Error( 'site_sync_invalid_delete', __( 'Missing taxonomy for term delete.', 'site-sync' ) );
}
$id = $this->find_term_by_external_id( $externalId, $taxonomy );
if ( $id ) {
do_action( 'site_sync/applying_delete', $type, $id );
wp_delete_term( $id, $taxonomy );
}
return true;
}
return new WP_Error( 'site_sync_unknown_delete', __( 'Unknown delete entity type.', 'site-sync' ) );
}
/**
* Derive a reasonable extension from MIME or filename.
*
* @param string $mime The MIME type to use for extension lookup.
* @param string $filename The filename to use for extension extraction.
* @return string The determined file extension, including the dot.
*/
private function determine_extension( string $mime, string $filename = '' ): string {
$mime = (string) $mime;
if ( $mime ) {
$extMap = wp_get_mime_types();
$extGuess = array_search( $mime, $extMap, true );
if ( $extGuess ) {
$parts = explode( '|', $extGuess );
if ( ! empty( $parts[0] ) ) {
return '.' . $parts[0];
}
}
}
if ( $filename ) {
$ext = pathinfo( $filename, PATHINFO_EXTENSION );
if ( $ext ) {
return '.' . strtolower( $ext );
}
}
return '';
}
/**
* Sanitize a media filename and ensure single extension.
*
* @param string $name The original filename.
* @param string $mime The MIME type of the file.
* @param string $ext The file extension (optional).
* @return string The sanitized filename with a single extension.
*/
private function normalize_media_filename( string $name, string $mime = '', string $ext = '' ): string {
$base = pathinfo( $name, PATHINFO_FILENAME );
$base = $base ? $base : 'media';
$safeBase = sanitize_file_name( $base );
$cleanExt = $ext ? $ext : $this->determine_extension( $mime, $name );
// Remove leading dot from ext before re-appending.
$cleanExt = $cleanExt ? '.' . ltrim( $cleanExt, '.' ) : '';
return $safeBase . $cleanExt;
}
/**
* Temporarily allow a specific mime for uploads/sideloads.
*
* @param string $mime The MIME type to allow.
* @param string $ext The file extension to allow (optional).
* @return array|null The added filters, or null if not applied.
*/
private function temporarily_allow_mime( string $mime, string $ext = '' ): ?array {
$mime = (string) $mime;
if ( ! $mime ) {
return null;
}
$ext = ltrim( $ext, '.' );
$mimesCallback = function ( $mimes ) use ( $mime, $ext ) {
if ( ! is_array( $mimes ) ) {
$mimes = array();
}
$existing = array_search( $mime, $mimes, true );
if ( $existing === false ) {
$extKey = $ext ? $ext : ( 'sync-' . sanitize_key( str_replace( '/', '-', $mime ) ) );
$mimes[ $extKey ] = $mime;
}
return $mimes;
};
$checkCallback = function ( $types, $file, $filename, $mimes ) use ( $mime, $ext ) { // phpcs:ignore
$extOut = $ext ? $ext : pathinfo( $filename, PATHINFO_EXTENSION );
if ( $mime && $extOut ) {
return array(
'ext' => ltrim( strtolower( $extOut ), '.' ),
'type' => $mime,
'proper_filename' => $filename,
);
}
return $types;
};
add_filter( 'upload_mimes', $mimesCallback, 10, 1 );
add_filter( 'wp_check_filetype_and_ext', $checkCallback, 10, 4 );
return array(
'mimes' => $mimesCallback,
'check' => $checkCallback,
);
}
/**
* Removes temporary MIME type and filetype filters added for uploads/sideloads.
*
* @param array|null $filters The filters to remove, as returned by temporarily_allow_mime().
* @return void
*/
private function remove_temporary_mime_filters( $filters ): void {
if ( ! $filters || ! is_array( $filters ) ) {
return;
}
if ( ! empty( $filters['mimes'] ) ) {
remove_filter( 'upload_mimes', $filters['mimes'], 10 );
}
if ( ! empty( $filters['check'] ) ) {
remove_filter( 'wp_check_filetype_and_ext', $filters['check'], 10 );
}
}
/**
* Logs an error message with details from a WP_Error object.
*
* @param string $message The error message to log.
* @param WP_Error $error The WP_Error object containing error details.
* @param string $source The source of the error (e.g., 'cron', 'manual').
*
* @return void
*/
private function report_error( string $message, WP_Error $error, string $source ): void {
Log::error(
$message,
array(
'source' => $source,
'code' => $error->get_error_code(),
'msg' => $error->get_error_message(),
'data' => $error->get_error_data(),
)
);
}
}