Spaces:
No application file
No application file
| declare(strict_types=1); | |
| namespace Mautic\IntegrationsBundle\Sync\SyncProcess; | |
| use Mautic\IntegrationsBundle\Event\CompletedSyncIterationEvent; | |
| use Mautic\IntegrationsBundle\Event\SyncEvent; | |
| use Mautic\IntegrationsBundle\Exception\IntegrationNotFoundException; | |
| use Mautic\IntegrationsBundle\IntegrationEvents; | |
| use Mautic\IntegrationsBundle\Sync\DAO\Mapping\MappingManualDAO; | |
| use Mautic\IntegrationsBundle\Sync\DAO\Sync\InputOptionsDAO; | |
| use Mautic\IntegrationsBundle\Sync\DAO\Sync\ObjectIdsDAO; | |
| use Mautic\IntegrationsBundle\Sync\DAO\Sync\Order\ObjectMappingsDAO; | |
| use Mautic\IntegrationsBundle\Sync\DAO\Sync\Order\OrderDAO; | |
| use Mautic\IntegrationsBundle\Sync\DAO\Sync\Order\OrderResultsDAO; | |
| use Mautic\IntegrationsBundle\Sync\DAO\Sync\Report\ReportDAO; | |
| use Mautic\IntegrationsBundle\Sync\Exception\HandlerNotSupportedException; | |
| use Mautic\IntegrationsBundle\Sync\Helper\MappingHelper; | |
| use Mautic\IntegrationsBundle\Sync\Helper\RelationsHelper; | |
| use Mautic\IntegrationsBundle\Sync\Helper\SyncDateHelper; | |
| use Mautic\IntegrationsBundle\Sync\Logger\DebugLogger; | |
| use Mautic\IntegrationsBundle\Sync\Notification\Notifier; | |
| use Mautic\IntegrationsBundle\Sync\SyncDataExchange\MauticSyncDataExchange; | |
| use Mautic\IntegrationsBundle\Sync\SyncDataExchange\SyncDataExchangeInterface; | |
| use Mautic\IntegrationsBundle\Sync\SyncProcess\Direction\Integration\IntegrationSyncProcess; | |
| use Mautic\IntegrationsBundle\Sync\SyncProcess\Direction\Internal\MauticSyncProcess; | |
| use Mautic\IntegrationsBundle\Sync\SyncService\SyncServiceInterface; | |
| use Symfony\Component\EventDispatcher\EventDispatcherInterface; | |
| class SyncProcess | |
| { | |
| private ?int $syncIteration = null; | |
| public function __construct( | |
| private SyncDateHelper $syncDateHelper, | |
| private MappingHelper $mappingHelper, | |
| private RelationsHelper $relationsHelper, | |
| private IntegrationSyncProcess $integrationSyncProcess, | |
| private MauticSyncProcess $mauticSyncProcess, | |
| private EventDispatcherInterface $eventDispatcher, | |
| private Notifier $notifier, | |
| private MappingManualDAO $mappingManualDAO, | |
| private MauticSyncDataExchange $internalSyncDataExchange, | |
| private SyncDataExchangeInterface $integrationSyncDataExchange, | |
| private InputOptionsDAO $inputOptionsDAO, | |
| private SyncServiceInterface $syncService | |
| ) { | |
| } | |
| /** | |
| * Execute sync with integration. | |
| */ | |
| public function execute(): void | |
| { | |
| defined('MAUTIC_INTEGRATION_ACTIVE_SYNC') or define('MAUTIC_INTEGRATION_ACTIVE_SYNC', 1); | |
| // Setup/prepare for the sync | |
| $this->syncDateHelper->setSyncDateTimes($this->inputOptionsDAO->getStartDateTime(), $this->inputOptionsDAO->getEndDateTime()); | |
| $this->integrationSyncProcess->setupSync($this->inputOptionsDAO, $this->mappingManualDAO, $this->integrationSyncDataExchange); | |
| $this->mauticSyncProcess->setupSync($this->inputOptionsDAO, $this->mappingManualDAO, $this->internalSyncDataExchange); | |
| if ($this->inputOptionsDAO->pullIsEnabled()) { | |
| $this->executeIntegrationSync(); | |
| } | |
| if ($this->inputOptionsDAO->pushIsEnabled()) { | |
| $this->syncDateHelper->setInternalSyncStartDateTime(); | |
| $this->executeInternalSync(); | |
| } | |
| // Tell listeners sync is done | |
| $this->eventDispatcher->dispatch( | |
| new SyncEvent($this->inputOptionsDAO), | |
| IntegrationEvents::INTEGRATION_POST_EXECUTE | |
| ); | |
| } | |
| private function executeIntegrationSync(): void | |
| { | |
| $this->syncIteration = 1; | |
| while (true) { | |
| DebugLogger::log( | |
| $this->mappingManualDAO->getIntegration(), | |
| sprintf('Integration to Mautic; syncing iteration %s', $this->syncIteration), | |
| self::class.':'.__FUNCTION__ | |
| ); | |
| $syncReport = $this->integrationSyncProcess->getSyncReport($this->syncIteration); | |
| if (!$syncReport->shouldSync()) { | |
| DebugLogger::log( | |
| $this->mappingManualDAO->getIntegration(), | |
| 'Integration to Mautic; no objects were mapped to be synced', | |
| self::class.':'.__FUNCTION__ | |
| ); | |
| break; | |
| } | |
| // Update the mappings in case objects have been converted such as Lead -> Contact | |
| $this->mappingHelper->remapIntegrationObjects($syncReport->getRemappedObjects()); | |
| // Maps relations, synchronizes missing objects if necessary | |
| $this->manageRelations($syncReport); | |
| // Convert the integrations' report into an "order" or instructions for Mautic | |
| $syncOrder = $this->mauticSyncProcess->getSyncOrder($syncReport); | |
| if (!$syncOrder->shouldSync()) { | |
| DebugLogger::log( | |
| $this->mappingManualDAO->getIntegration(), | |
| 'Integration to Mautic; no object changes were recorded possible due to field direction configurations', | |
| self::class.':'.__FUNCTION__ | |
| ); | |
| break; | |
| } | |
| DebugLogger::log( | |
| $this->mappingManualDAO->getIntegration(), | |
| sprintf( | |
| 'Integration to Mautic; syncing %d total objects', | |
| $syncOrder->getObjectCount() | |
| ), | |
| self::class.':'.__FUNCTION__ | |
| ); | |
| // Execute the sync instructions | |
| $objectMappings = $this->internalSyncDataExchange->executeSyncOrder($syncOrder); | |
| // Dispatch an event to allow subscribers to take action after this batch of objects has been synced to Mautic | |
| $orderResults = $this->getOrderResultsForIntegrationSync($syncOrder, $objectMappings); | |
| $this->eventDispatcher->dispatch( | |
| new CompletedSyncIterationEvent($orderResults, $this->syncIteration, $this->inputOptionsDAO, $this->mappingManualDAO), | |
| IntegrationEvents::INTEGRATION_BATCH_SYNC_COMPLETED_INTEGRATION_TO_MAUTIC | |
| ); | |
| unset($orderResults); | |
| if ($this->shouldStopIntegrationSync()) { | |
| break; | |
| } | |
| // Fetch the next iteration/batch | |
| ++$this->syncIteration; | |
| } | |
| } | |
| private function executeInternalSync(): void | |
| { | |
| $this->syncIteration = 1; | |
| while (true) { | |
| DebugLogger::log( | |
| $this->mappingManualDAO->getIntegration(), | |
| sprintf('Mautic to integration; syncing iteration %s', $this->syncIteration), | |
| self::class.':'.__FUNCTION__ | |
| ); | |
| $syncReport = $this->mauticSyncProcess->getSyncReport($this->syncIteration); | |
| if (!$syncReport->shouldSync()) { | |
| DebugLogger::log( | |
| $this->mappingManualDAO->getIntegration(), | |
| 'Mautic to integration; no objects were mapped to be synced', | |
| self::class.':'.__FUNCTION__ | |
| ); | |
| break; | |
| } | |
| // Convert the internal report into an "order" or instructions for the integration | |
| $syncOrder = $this->integrationSyncProcess->getSyncOrder($syncReport); | |
| if (!$syncOrder->shouldSync()) { | |
| DebugLogger::log( | |
| $this->mappingManualDAO->getIntegration(), | |
| 'Mautic to integration; no object changes were recorded possible due to field direction configurations', | |
| self::class.':'.__FUNCTION__ | |
| ); | |
| // Finalize notifications such as injecting user notifications | |
| $this->notifier->finalizeNotifications(); | |
| break; | |
| } | |
| DebugLogger::log( | |
| $this->mappingManualDAO->getIntegration(), | |
| sprintf( | |
| 'Mautic to integration; syncing %d total objects', | |
| $syncOrder->getObjectCount() | |
| ), | |
| self::class.':'.__FUNCTION__ | |
| ); | |
| // Execute the sync instructions | |
| $this->integrationSyncDataExchange->executeSyncOrder($syncOrder); | |
| // Save mappings and cleanup | |
| $this->finalizeSync($syncOrder); | |
| // Dispatch an event to allow subscribers to take action after this batch of objects has been synced to the integration | |
| $orderResults = $this->getOrderResultsForInternalSync($syncOrder); | |
| $this->eventDispatcher->dispatch( | |
| new CompletedSyncIterationEvent($orderResults, $this->syncIteration, $this->inputOptionsDAO, $this->mappingManualDAO), | |
| IntegrationEvents::INTEGRATION_BATCH_SYNC_COMPLETED_MAUTIC_TO_INTEGRATION | |
| ); | |
| unset($orderResults); | |
| // Fetch the next iteration/batch | |
| ++$this->syncIteration; | |
| } | |
| } | |
| private function manageRelations(ReportDAO $syncReport): void | |
| { | |
| // Map relations | |
| $this->relationsHelper->processRelations($this->mappingManualDAO, $syncReport); | |
| // Relation objects we need to synchronize | |
| $objectsToSynchronize = $this->relationsHelper->getObjectsToSynchronize(); | |
| if (!empty($objectsToSynchronize)) { | |
| $this->synchronizeMissingObjects($objectsToSynchronize, $syncReport); | |
| } | |
| } | |
| private function synchronizeMissingObjects(array $objectsToSynchronize, ReportDAO $syncReport): void | |
| { | |
| $inputOptions = $this->getInputOptionsForObjects($objectsToSynchronize); | |
| // We need to synchronize missing relation ids | |
| $this->processParallelSync($inputOptions); | |
| // Now we can map relations for objects we have just synchronised | |
| $this->relationsHelper->processRelations($this->mappingManualDAO, $syncReport); | |
| } | |
| /** | |
| * @throws \Mautic\IntegrationsBundle\Exception\InvalidValueException | |
| */ | |
| private function getInputOptionsForObjects(array $objectsToSynchronize): InputOptionsDAO | |
| { | |
| $mauticObjectIds = new ObjectIdsDAO(); | |
| foreach ($objectsToSynchronize as $object) { | |
| $mauticObjectIds->addObjectId($object->getObject(), $object->getObjectId()); | |
| } | |
| $integration = $this->mappingManualDAO->getIntegration(); | |
| return new InputOptionsDAO([ | |
| 'integration' => $integration, | |
| 'integration-object-id' => $mauticObjectIds, | |
| ]); | |
| } | |
| /** | |
| * @throws IntegrationNotFoundException | |
| */ | |
| private function processParallelSync($inputOptions): void | |
| { | |
| $currentSyncProcess = clone $this->integrationSyncProcess; | |
| $this->syncService->processIntegrationSync($inputOptions); | |
| // We need to bring back current $inputOptions which were overwritten by new sync | |
| $this->integrationSyncProcess = $currentSyncProcess; | |
| } | |
| private function shouldStopIntegrationSync(): bool | |
| { | |
| // We don't want to iterate sync for specific ids | |
| return null !== $this->inputOptionsDAO->getIntegrationObjectIds(); | |
| } | |
| /** | |
| * @throws IntegrationNotFoundException | |
| * @throws HandlerNotSupportedException | |
| */ | |
| private function finalizeSync(OrderDAO $syncOrder): void | |
| { | |
| // Save the mappings between Mautic objects and the integration's objects | |
| $this->mappingHelper->saveObjectMappings($syncOrder->getObjectMappings()); | |
| // Remap integration objects to Mautic objects if applicable | |
| $this->mappingHelper->remapIntegrationObjects($syncOrder->getRemappedObjects()); | |
| // Update last sync dates on existing object mappings | |
| $this->mappingHelper->updateObjectMappings($syncOrder->getUpdatedObjectMappings()); | |
| // Tell sync that these objects have been deleted and not to continue re-syncing them | |
| $this->mappingHelper->markAsDeleted($syncOrder->getDeletedObjects()); | |
| // Inject notifications | |
| $this->notifier->noteMauticSyncIssue($syncOrder->getNotifications()); | |
| // Cleanup field tracking for successfully synced objects | |
| $this->internalSyncDataExchange->cleanupProcessedObjects($syncOrder->getSuccessfullySyncedObjects()); | |
| } | |
| private function getOrderResultsForIntegrationSync(OrderDAO $syncOrder, ObjectMappingsDAO $objectMappings): OrderResultsDAO | |
| { | |
| // New objects were processed by OrderExecutioner | |
| $newObjectMappings = $objectMappings->getNewMappings(); | |
| // Updated objects were processed by OrderExecutioner | |
| $updatedObjectMappings = $objectMappings->getUpdatedMappings(); | |
| // Remapped objects | |
| $remappedObjects = $syncOrder->getRemappedObjects(); | |
| // Deleted objects | |
| $deletedObjects = $syncOrder->getDeletedObjects(); | |
| return new OrderResultsDAO($newObjectMappings, $updatedObjectMappings, $remappedObjects, $deletedObjects); | |
| } | |
| private function getOrderResultsForInternalSync(OrderDAO $syncOrder): OrderResultsDAO | |
| { | |
| // New object mappings | |
| $newObjectMappings = $syncOrder->getObjectMappings(); | |
| // Updated object mappings | |
| $updatedObjectMappings = []; | |
| foreach ($syncOrder->getUpdatedObjectMappings() as $updatedObjectMapping) { | |
| if (!$updatedObjectMapping->getObjectMapping()) { | |
| continue; | |
| } | |
| $updatedObjectMappings[] = $updatedObjectMapping->getObjectMapping(); | |
| } | |
| // Remapped objects | |
| $remappedObjects = $syncOrder->getRemappedObjects(); | |
| // Deleted objects | |
| $deletedObjects = $syncOrder->getDeletedObjects(); | |
| return new OrderResultsDAO($newObjectMappings, $updatedObjectMappings, $remappedObjects, $deletedObjects); | |
| } | |
| } | |