Spaces:
No application file
No application file
| namespace Mautic\LeadBundle\Model; | |
| use Doctrine\ORM\EntityManagerInterface; | |
| use Doctrine\ORM\ORMException; | |
| use Mautic\CoreBundle\Helper\Chart\ChartQuery; | |
| use Mautic\CoreBundle\Helper\Chart\LineChart; | |
| use Mautic\CoreBundle\Helper\CoreParametersHelper; | |
| use Mautic\CoreBundle\Helper\DateTimeHelper; | |
| use Mautic\CoreBundle\Helper\InputHelper; | |
| use Mautic\CoreBundle\Helper\PathsHelper; | |
| use Mautic\CoreBundle\Helper\UserHelper; | |
| use Mautic\CoreBundle\Model\FormModel; | |
| use Mautic\CoreBundle\Model\NotificationModel; | |
| use Mautic\CoreBundle\ProcessSignal\ProcessSignalService; | |
| use Mautic\CoreBundle\Security\Permissions\CorePermissions; | |
| use Mautic\CoreBundle\Translation\Translator; | |
| use Mautic\LeadBundle\Entity\Company; | |
| use Mautic\LeadBundle\Entity\Import; | |
| use Mautic\LeadBundle\Entity\ImportRepository; | |
| use Mautic\LeadBundle\Entity\LeadEventLog; | |
| use Mautic\LeadBundle\Entity\LeadEventLogRepository; | |
| use Mautic\LeadBundle\Event\ImportEvent; | |
| use Mautic\LeadBundle\Event\ImportProcessEvent; | |
| use Mautic\LeadBundle\Exception\ImportDelayedException; | |
| use Mautic\LeadBundle\Exception\ImportFailedException; | |
| use Mautic\LeadBundle\Helper\Progress; | |
| use Mautic\LeadBundle\LeadEvents; | |
| use Psr\Log\LoggerInterface; | |
| use Symfony\Component\EventDispatcher\EventDispatcherInterface; | |
| use Symfony\Component\HttpKernel\Exception\MethodNotAllowedHttpException; | |
| use Symfony\Component\Routing\Generator\UrlGeneratorInterface; | |
| use Symfony\Contracts\EventDispatcher\Event; | |
| /** | |
| * @extends FormModel<Import> | |
| */ | |
| class ImportModel extends FormModel | |
| { | |
| protected LeadEventLogRepository $leadEventLogRepo; | |
| public function __construct( | |
| protected PathsHelper $pathsHelper, | |
| protected LeadModel $leadModel, | |
| protected NotificationModel $notificationModel, | |
| protected CoreParametersHelper $config, | |
| protected CompanyModel $companyModel, | |
| EntityManagerInterface $em, | |
| CorePermissions $security, | |
| EventDispatcherInterface $dispatcher, | |
| UrlGeneratorInterface $router, | |
| Translator $translator, | |
| UserHelper $userHelper, | |
| LoggerInterface $mauticLogger, | |
| private ProcessSignalService $processSignalService | |
| ) { | |
| $this->leadEventLogRepo = $leadModel->getEventLogRepository(); | |
| parent::__construct($em, $security, $dispatcher, $router, $translator, $userHelper, $mauticLogger, $config); | |
| } | |
| /** | |
| * Returns the Import entity which should be processed next. | |
| * | |
| * @return Import|null | |
| */ | |
| public function getImportToProcess() | |
| { | |
| $result = $this->getRepository()->getImportsWithStatuses([Import::QUEUED, Import::DELAYED], 1); | |
| if (isset($result[0]) && $result[0] instanceof Import) { | |
| return $result[0]; | |
| } | |
| return null; | |
| } | |
| /** | |
| * Compares current number of imports in progress with the limit from the configuration. | |
| */ | |
| public function checkParallelImportLimit(): bool | |
| { | |
| $parallelImportLimit = $this->getParallelImportLimit(); | |
| $importsInProgress = $this->getRepository()->countImportsInProgress(); | |
| return !($importsInProgress >= $parallelImportLimit); | |
| } | |
| /** | |
| * Returns parallel import limit from the configuration. | |
| * | |
| * @param int $default | |
| * | |
| * @return int | |
| */ | |
| public function getParallelImportLimit($default = 1) | |
| { | |
| return $this->config->get('parallel_import_limit', $default); | |
| } | |
| /** | |
| * Generates a HTML link to the import detail. | |
| */ | |
| public function generateLink(Import $import): string | |
| { | |
| return '<a href="'.$this->router->generate( | |
| 'mautic_import_action', | |
| ['objectAction' => 'view', 'object' => 'lead', 'objectId' => $import->getId()] | |
| ).'" data-toggle="ajax">'.$import->getOriginalFile().' ('.$import->getId().')</a>'; | |
| } | |
| /** | |
| * Check if there are some IN_PROGRESS imports which got stuck for a while. | |
| * Set those as failed. | |
| */ | |
| public function setGhostImportsAsFailed() | |
| { | |
| $ghostDelay = 2; | |
| $imports = $this->getRepository()->getGhostImports($ghostDelay, 5); | |
| if (empty($imports)) { | |
| return null; | |
| } | |
| foreach ($imports as $import) { | |
| $import->setStatus($import::FAILED) | |
| ->setStatusInfo($this->translator->trans('mautic.lead.import.ghost.limit.hit', ['%limit%' => $ghostDelay])) | |
| ->removeFile(); | |
| if ($import->getCreatedBy()) { | |
| $this->notificationModel->addNotification( | |
| $this->translator->trans( | |
| 'mautic.lead.import.result.info', | |
| ['%import%' => $this->generateLink($import)] | |
| ), | |
| 'info', | |
| false, | |
| $this->translator->trans('mautic.lead.import.failed'), | |
| 'ri-download-line', | |
| null, | |
| $this->em->getReference(\Mautic\UserBundle\Entity\User::class, $import->getCreatedBy()) | |
| ); | |
| } | |
| } | |
| $this->saveEntities($imports); | |
| } | |
| /** | |
| * Start import. This is meant for the CLI command since it will import | |
| * the whole file at once. | |
| * | |
| * @param int $limit Number of records to import before delaying the import. 0 will import all | |
| * | |
| * @throws ImportFailedException | |
| * @throws ImportDelayedException | |
| */ | |
| public function beginImport(Import $import, Progress $progress, $limit = 0): void | |
| { | |
| $this->setGhostImportsAsFailed(); | |
| if (!$import) { | |
| $msg = 'import is empty, closing the import process'; | |
| $this->logDebug($msg, $import); | |
| throw new ImportFailedException($msg); | |
| } | |
| if (!$import->canProceed()) { | |
| $this->saveEntity($import); | |
| $msg = 'import cannot be processed because '.$import->getStatusInfo(); | |
| $this->logDebug($msg, $import); | |
| throw new ImportFailedException($msg); | |
| } | |
| if (!$this->checkParallelImportLimit()) { | |
| $info = $this->translator->trans( | |
| 'mautic.lead.import.parallel.limit.hit', | |
| ['%limit%' => $this->getParallelImportLimit()] | |
| ); | |
| $import->setStatus($import::DELAYED)->setStatusInfo($info); | |
| $this->saveEntity($import); | |
| $msg = 'import is delayed because parrallel limit was hit. '.$import->getStatusInfo(); | |
| $this->logDebug($msg, $import); | |
| throw new ImportDelayedException($msg); | |
| } | |
| $processed = $import->getProcessedRows(); | |
| $total = $import->getLineCount(); | |
| $pending = $total - $processed; | |
| if ($limit && $limit < $pending) { | |
| $processed = 0; | |
| $total = $limit; | |
| } | |
| $progress->setTotal($total); | |
| $progress->setDone($processed); | |
| $import->start(); | |
| // Save the start changes so the user could see it | |
| $this->saveEntity($import); | |
| $this->logDebug('The background import is about to start', $import); | |
| try { | |
| if (!$this->process($import, $progress, $limit)) { | |
| throw new ImportFailedException($import->getStatusInfo()); | |
| } | |
| } catch (ORMException $e) { | |
| // The EntityManager is probably closed. The entity cannot be saved. | |
| $info = $this->translator->trans( | |
| 'mautic.lead.import.database.exception', | |
| ['%message%' => $e->getMessage()] | |
| ); | |
| $import->setStatus($import::DELAYED)->setStatusInfo($info); | |
| throw new ImportFailedException('Database had been overloaded'); | |
| } | |
| $import->end(); | |
| $this->logDebug('The background import has ended', $import); | |
| // Save the end changes so the user could see it | |
| $this->saveEntity($import); | |
| if ($import->getCreatedBy()) { | |
| $this->notificationModel->addNotification( | |
| $this->translator->trans( | |
| 'mautic.lead.import.result.info', | |
| ['%import%' => $this->generateLink($import)] | |
| ), | |
| 'info', | |
| false, | |
| $this->translator->trans('mautic.lead.import.completed'), | |
| 'ri-download-line', | |
| null, | |
| $this->em->getReference(\Mautic\UserBundle\Entity\User::class, $import->getCreatedBy()) | |
| ); | |
| } | |
| } | |
| /** | |
| * Import the CSV file from configuration in the $import entity. | |
| * | |
| * @param int $limit Number of records to import before delaying the import | |
| */ | |
| public function process(Import $import, Progress $progress, $limit = 0): bool | |
| { | |
| try { | |
| $file = new \SplFileObject($import->getFilePath()); | |
| } catch (\Exception $e) { | |
| $import->setStatusInfo('SplFileObject cannot read the file. '.$e->getMessage()); | |
| $import->setStatus(Import::FAILED); | |
| $this->logDebug('import cannot be processed because '.$import->getStatusInfo(), $import); | |
| return false; | |
| } | |
| $lastImportedLine = $import->getLastLineImported(); | |
| $headers = $import->getHeaders(); | |
| $headerCount = count($headers); | |
| $config = $import->getParserConfig(); | |
| $counter = 0; | |
| $file->seek($lastImportedLine); | |
| $lineNumber = $lastImportedLine + 1; | |
| $this->logDebug('The import is starting on line '.$lineNumber, $import); | |
| $batchSize = $config['batchlimit']; | |
| // Convert to field names | |
| array_walk($headers, function (&$val): void { | |
| $val = strtolower(InputHelper::alphanum($val, false, '_')); | |
| }); | |
| while ($batchSize && !$file->eof()) { | |
| $string = $file->current(); | |
| $file->next(); | |
| $data = str_getcsv($string, $config['delimiter'], $config['enclosure'], $config['escape']); | |
| $import->setLastLineImported($lineNumber); | |
| // Ignore the header row | |
| if (1 === $lineNumber) { | |
| ++$lineNumber; | |
| continue; | |
| } | |
| // Ensure the progress is changing | |
| ++$lineNumber; | |
| --$batchSize; | |
| $progress->increase(); | |
| $errorMessage = null; | |
| $eventLog = $this->initEventLog($import, $lineNumber); | |
| if ($this->isEmptyCsvRow($data)) { | |
| $errorMessage = 'mautic.lead.import.error.line_empty'; | |
| } | |
| if ($this->hasMoreValuesThanColumns($data, $headerCount)) { | |
| $errorMessage = 'mautic.lead.import.error.header_mismatch'; | |
| } | |
| if (!$errorMessage) { | |
| $data = $this->trimArrayValues($data); | |
| if (!array_filter($data)) { | |
| continue; | |
| } | |
| $data = array_combine($headers, $data); | |
| try { | |
| $event = new ImportProcessEvent($import, $eventLog, $data); | |
| $this->dispatcher->dispatch($event, LeadEvents::IMPORT_ON_PROCESS); | |
| if ($event->wasMerged()) { | |
| $this->logDebug('Entity on line '.$lineNumber.' has been updated', $import); | |
| $import->increaseUpdatedCount(); | |
| } else { | |
| $this->logDebug('Entity on line '.$lineNumber.' has been created', $import); | |
| $import->increaseInsertedCount(); | |
| } | |
| } catch (\Exception $e) { | |
| // Email validation likely failed | |
| $errorMessage = $e->getMessage(); | |
| } | |
| } | |
| if ($errorMessage) { | |
| // Log the error first | |
| $import->increaseIgnoredCount(); | |
| $this->logDebug('Line '.$lineNumber.' error: '.$errorMessage, $import); | |
| if (!$this->em->isOpen()) { | |
| // Something bad must have happened if the entity manager is closed. | |
| // We will not be able to save any entities. | |
| throw new ORMException($errorMessage); | |
| } | |
| // This should be called only if the entity manager is open | |
| $this->logImportRowError($eventLog, $errorMessage); | |
| } else { | |
| $this->leadEventLogRepo->saveEntity($eventLog); | |
| } | |
| // Release entities in Doctrine's memory to prevent memory leak | |
| $this->em->detach($eventLog); | |
| if (null !== $leadEntity = $eventLog->getLead()) { | |
| $this->em->detach($leadEntity); | |
| $company = $leadEntity->getCompany(); | |
| $primaryCompany = $leadEntity->getPrimaryCompany(); | |
| if ($company instanceof Company) { | |
| $this->em->detach($company); | |
| } | |
| if ($primaryCompany instanceof Company) { | |
| $this->em->detach($primaryCompany); | |
| } | |
| } | |
| $eventLog = null; | |
| $data = null; | |
| // Save Import entity once per batch so the user could see the progress | |
| if (0 === $batchSize && $import->isBackgroundProcess()) { | |
| $isPublished = $this->getRepository()->getValue($import->getId(), 'is_published'); | |
| if (!$isPublished) { | |
| $import->setStatus($import::STOPPED); | |
| } | |
| $this->saveEntity($import); | |
| $this->dispatchEvent('batch_processed', $import); | |
| // Stop the import loop if the import got unpublished | |
| if (!$isPublished) { | |
| $this->logDebug('The import has been unpublished. Stopping the import now.', $import); | |
| break; | |
| } | |
| $batchSize = $config['batchlimit']; | |
| } | |
| if ($this->processSignalService->isSignalCaught()) { | |
| break; | |
| } | |
| ++$counter; | |
| if ($limit && $counter >= $limit) { | |
| break; | |
| } | |
| } | |
| if ($import->getLastLineImported() < $import->getLineCount()) { | |
| $import->setStatus($import::DELAYED); | |
| $this->saveEntity($import); | |
| } | |
| // Close the file | |
| $file = null; | |
| return true; | |
| } | |
| /** | |
| * Check if the CSV row has more values than the CSV header has columns. | |
| * If it is less, generate empty values for the rest of the missing values. | |
| * If it is more, return true. | |
| * | |
| * @param int $headerCount | |
| */ | |
| public function hasMoreValuesThanColumns(array &$data, $headerCount): bool | |
| { | |
| $dataCount = count($data); | |
| if ($headerCount !== $dataCount) { | |
| $diffCount = ($headerCount - $dataCount); | |
| if ($diffCount > 0) { | |
| // Fill in the data with empty string | |
| $fill = array_fill($dataCount, $diffCount, ''); | |
| $data = $data + $fill; | |
| } else { | |
| return true; | |
| } | |
| } | |
| return false; | |
| } | |
| /** | |
| * Trim all values in a one dymensional array. | |
| */ | |
| public function trimArrayValues(array $data): array | |
| { | |
| return array_map('trim', $data); | |
| } | |
| /** | |
| * Decide whether the CSV row is empty. | |
| * | |
| * @param mixed $row | |
| */ | |
| public function isEmptyCsvRow($row): bool | |
| { | |
| if (!is_array($row) || empty($row)) { | |
| return true; | |
| } | |
| if (1 === count($row) && ('' === $row[0] || null === $row[0])) { | |
| return true; | |
| } | |
| return !array_filter($row); | |
| } | |
| /** | |
| * Save log about errored line. | |
| * | |
| * @param string $errorMessage | |
| */ | |
| public function logImportRowError(LeadEventLog $eventLog, $errorMessage): void | |
| { | |
| $eventLog->addProperty('error', $this->translator->trans($errorMessage)) | |
| ->setAction('failed'); | |
| $this->leadEventLogRepo->saveEntity($eventLog); | |
| } | |
| /** | |
| * Initialize LeadEventLog object and configure it as the import event. | |
| * | |
| * @param int $lineNumber | |
| */ | |
| public function initEventLog(Import $import, $lineNumber): LeadEventLog | |
| { | |
| $eventLog = new LeadEventLog(); | |
| $eventLog->setUserId($import->getCreatedBy()) | |
| ->setUserName($import->getCreatedByUser()) | |
| ->setBundle($import->getObject()) | |
| ->setObject('import') | |
| ->setObjectId($import->getId()) | |
| ->setProperties( | |
| [ | |
| 'line' => $lineNumber, | |
| 'file' => $import->getOriginalFile(), | |
| ] | |
| ); | |
| return $eventLog; | |
| } | |
| /** | |
| * Get line chart data of imported rows. | |
| * | |
| * @param string $unit {@link php.net/manual/en/function.date.php#refsect1-function.date-parameters} | |
| * @param string $dateFormat | |
| * @param array $filter | |
| */ | |
| public function getImportedRowsLineChartData($unit, \DateTimeInterface $dateFrom, \DateTimeInterface $dateTo, $dateFormat = null, $filter = []): array | |
| { | |
| $filter['object'] = 'import'; | |
| $filter['bundle'] = 'lead'; | |
| // Clear the times for display by minutes | |
| /** @var \DateTime $dateFrom */ | |
| /** @var \DateTime $dateTo */ | |
| $dateFrom->modify('-1 minute'); | |
| $dateFrom->setTime($dateFrom->format('H'), $dateFrom->format('i'), 0); | |
| $dateTo->modify('+1 minute'); | |
| $dateTo->setTime($dateTo->format('H'), $dateTo->format('i'), 0); | |
| $query = new ChartQuery($this->em->getConnection(), $dateFrom, $dateTo, $unit); | |
| $chart = new LineChart($unit, $dateFrom, $dateTo, $dateFormat); | |
| $data = $query->fetchTimeData('lead_event_log', 'date_added', $filter); | |
| $chart->setDataset($this->translator->trans('mautic.lead.import.processed.rows'), $data); | |
| return $chart->render(); | |
| } | |
| /** | |
| * Returns a list of failed rows for the import. | |
| * | |
| * @param int $importId | |
| * @param string $object | |
| * | |
| * @return array|null | |
| */ | |
| public function getFailedRows($importId = null, $object = 'lead') | |
| { | |
| if (!$importId) { | |
| return null; | |
| } | |
| return $this->getEventLogRepository()->getFailedRows($importId, ['select' => 'properties,id'], $object); | |
| } | |
| /** | |
| * @return ImportRepository | |
| */ | |
| public function getRepository() | |
| { | |
| return $this->em->getRepository(Import::class); | |
| } | |
| /** | |
| * @return LeadEventLogRepository | |
| */ | |
| public function getEventLogRepository() | |
| { | |
| return $this->em->getRepository(LeadEventLog::class); | |
| } | |
| public function getPermissionBase(): string | |
| { | |
| return 'lead:imports'; | |
| } | |
| /** | |
| * Returns a unique name of a CSV file based on time. | |
| */ | |
| public function getUniqueFileName(): string | |
| { | |
| return (new DateTimeHelper())->toUtcString('YmdHis').'.csv'; | |
| } | |
| /** | |
| * Returns a full path to the import dir. | |
| */ | |
| public function getImportDir(): string | |
| { | |
| return $this->pathsHelper->getImportLeadsPath(); | |
| } | |
| /** | |
| * Get a specific entity or generate a new one if id is empty. | |
| */ | |
| public function getEntity($id = null): ?Import | |
| { | |
| if (null === $id) { | |
| return new Import(); | |
| } | |
| return parent::getEntity($id); | |
| } | |
| /** | |
| * @throws MethodNotAllowedHttpException | |
| */ | |
| protected function dispatchEvent($action, &$entity, $isNew = false, Event $event = null): ?Event | |
| { | |
| if (!$entity instanceof Import) { | |
| throw new MethodNotAllowedHttpException(['Import']); | |
| } | |
| switch ($action) { | |
| case 'pre_save': | |
| $name = LeadEvents::IMPORT_PRE_SAVE; | |
| break; | |
| case 'post_save': | |
| $name = LeadEvents::IMPORT_POST_SAVE; | |
| break; | |
| case 'pre_delete': | |
| $name = LeadEvents::IMPORT_PRE_DELETE; | |
| break; | |
| case 'post_delete': | |
| $name = LeadEvents::IMPORT_POST_DELETE; | |
| break; | |
| case 'batch_processed': | |
| $name = LeadEvents::IMPORT_BATCH_PROCESSED; | |
| break; | |
| default: | |
| return null; | |
| } | |
| if ($this->dispatcher->hasListeners($name)) { | |
| if (empty($event)) { | |
| $event = new ImportEvent($entity, $isNew); | |
| $event->setEntityManager($this->em); | |
| } | |
| $this->dispatcher->dispatch($event, $name); | |
| return $event; | |
| } else { | |
| return null; | |
| } | |
| } | |
| /** | |
| * Logs a debug message if in dev environment. | |
| * | |
| * @param string $msg | |
| */ | |
| protected function logDebug($msg, Import $import = null) | |
| { | |
| if (MAUTIC_ENV === 'dev') { | |
| $importId = $import ? '('.$import->getId().')' : ''; | |
| $this->logger->debug(sprintf('IMPORT%s: %s', $importId, $msg)); | |
| } | |
| } | |
| } | |