feature: Add split-direction (push/pull) process

This commit is contained in:
Keith Solomon
2025-12-14 21:57:25 -06:00
parent 9375ca61a0
commit e44a33cd8e
4 changed files with 214 additions and 52 deletions

View File

@@ -56,6 +56,8 @@ class Sync_Engine {
public function hooks(): void {
add_action( 'site_sync/run_cycle', array( $this, 'run_cycle' ), 10, 1 );
add_action( 'site_sync/manual_trigger', array( $this, 'run_manual' ), 10, 1 );
add_action( 'site_sync/manual_push', array( $this, 'run_manual_push' ), 10, 1 );
add_action( 'site_sync/manual_pull', array( $this, 'run_manual_pull' ), 10, 1 );
add_action( 'trashed_post', array( $this, 'mark_post_tombstone' ) );
add_action( 'before_delete_post', array( $this, 'mark_post_tombstone' ) );
add_action( 'delete_term', array( $this, 'mark_term_tombstone' ), 10, 4 );
@@ -83,13 +85,36 @@ class Sync_Engine {
$this->run_once( 'manual' );
}
/**
* Manual push-only trigger from admin.
*
* @param array $settings Settings array.
*/
public function run_manual_push( $settings = array() ): void { // phpcs:ignore
$this->run_once( 'manual_push', true, false, false );
}
/**
* Manual pull-only trigger from admin.
*
* @param array $settings Settings array.
*/
public function run_manual_pull( $settings = array() ): void { // phpcs:ignore
$this->run_once( 'manual_pull', false, true, false );
}
/**
* Main sync loop.
*
* @param string $source The source of the sync trigger (e.g., 'cron' or 'manual').
* @param string $source The source of the sync trigger (e.g., 'cron' or 'manual').
* @param bool $doPush Whether to perform a push operation.
* @param bool $doPull Whether to perform a pull operation.
* @param bool $respectSettings Whether to respect the settings for push/pull enablement.
*/
public function run_once( string $source ): void {
public function run_once( string $source, bool $doPush = true, bool $doPull = true, bool $respectSettings = true ): void {
$settings = $this->settings->ensure_defaults();
$allowPush = $doPush;
$allowPull = $doPull;
$outCounts = array(
'posts' => 0,
'terms' => 0,
@@ -102,6 +127,30 @@ class Sync_Engine {
'errors' => 0,
);
if ( $respectSettings ) {
$allowPush = $allowPush && ! empty( $settings['enable_push'] );
$allowPull = $allowPull && ! empty( $settings['enable_pull'] );
}
$mode = array(
'push' => $allowPush,
'pull' => $allowPull,
);
if ( ! $allowPush && ! $allowPull ) {
Log::info( 'Sync skipped; both directions disabled.', array( 'source' => $source ) );
$this->state->record_run(
array(
'error' => 'Sync directions disabled',
'counts' => array(
'sent' => $outCounts,
'received' => $inCounts,
),
)
);
return;
}
if ( empty( $settings['peer_url'] ) ) {
Log::warning( 'Sync skipped; peer URL not configured.', array( 'source' => $source ) );
$this->state->record_run( array( 'error' => 'Peer URL not configured' ) );
@@ -123,54 +172,62 @@ class Sync_Engine {
)
);
$outbound = $this->provide_outbox( true );
$outCounts = $outbound['meta']['counts'] ?? $outCounts;
$payload = $outbound;
if ( isset( $payload['meta'] ) ) {
unset( $payload['meta'] );
}
$push = $this->transport->post( '/wp-json/site-sync/v1/inbox', $payload );
if ( is_wp_error( $push ) ) {
$this->report_error( 'Push to peer failed', $push, $source );
$this->state->record_run(
array(
'error' => $push->get_error_message(),
'counts' => array(
'sent' => $outCounts,
'received' => $inCounts,
),
)
);
return;
}
$this->commit_outbox( $outbound['cursor'], $outbound['meta']['counts'] ?? array() );
$pull = $this->transport->get( '/wp-json/site-sync/v1/outbox' );
if ( is_wp_error( $pull ) ) {
$this->report_error( 'Pull from peer failed', $pull, $source );
$this->state->record_run(
array(
'error' => $pull->get_error_message(),
'counts' => array(
'sent' => $outCounts,
'received' => $inCounts,
),
)
);
return;
}
if ( is_array( $pull ) && ! empty( $pull['items'] ) ) {
$result = $this->handle_inbox( $pull );
if ( is_array( $result ) ) {
$inCounts = array(
'applied' => (int) ( $result['applied'] ?? 0 ),
'skipped' => (int) ( $result['skipped'] ?? 0 ),
'errors' => is_array( $result['errors'] ?? null ) ? count( $result['errors'] ) : 0,
);
if ( $allowPush ) {
$outbound = $this->provide_outbox( true );
$outCounts = $outbound['meta']['counts'] ?? $outCounts;
$payload = $outbound;
if ( isset( $payload['meta'] ) ) {
unset( $payload['meta'] );
}
$push = $this->transport->post( '/wp-json/site-sync/v1/inbox', $payload );
if ( is_wp_error( $push ) ) {
$this->report_error( 'Push to peer failed', $push, $source );
$this->state->record_run(
array(
'error' => $push->get_error_message(),
'counts' => array(
'sent' => $outCounts,
'received' => $inCounts,
),
)
);
return;
}
$this->commit_outbox( $outbound['cursor'], $outbound['meta']['counts'] ?? array() );
} else {
Log::info( 'Push skipped for this run.', array( 'source' => $source ) );
}
if ( $allowPull ) {
$pull = $this->transport->get( '/wp-json/site-sync/v1/outbox' );
if ( is_wp_error( $pull ) ) {
$this->report_error( 'Pull from peer failed', $pull, $source );
$this->state->record_run(
array(
'error' => $pull->get_error_message(),
'counts' => array(
'sent' => $outCounts,
'received' => $inCounts,
),
)
);
return;
}
if ( is_array( $pull ) && ! empty( $pull['items'] ) ) {
$result = $this->handle_inbox( $pull );
if ( is_array( $result ) ) {
$inCounts = array(
'applied' => (int) ( $result['applied'] ?? 0 ),
'skipped' => (int) ( $result['skipped'] ?? 0 ),
'errors' => is_array( $result['errors'] ?? null ) ? count( $result['errors'] ) : 0,
);
}
}
} else {
Log::info( 'Pull skipped for this run.', array( 'source' => $source ) );
}
$this->state->record_run(
@@ -186,6 +243,7 @@ class Sync_Engine {
'Sync cycle completed',
array(
'source' => $source,
'mode' => $mode,
'sent' => $outCounts,
'received' => $inCounts,
)