init
This commit is contained in:
+134
@@ -0,0 +1,134 @@
|
||||
<?php declare(strict_types = 1);
|
||||
|
||||
namespace MailPoet\Newsletter\Sending;
|
||||
|
||||
if (!defined('ABSPATH')) exit;
|
||||
|
||||
|
||||
use MailPoet\Entities\ScheduledTaskSubscriberEntity;
|
||||
use MailPoet\Listing\ListingDefinition;
|
||||
use MailPoet\Listing\ListingRepository;
|
||||
use MailPoet\Util\Helpers;
|
||||
use MailPoetVendor\Doctrine\ORM\QueryBuilder;
|
||||
|
||||
class ScheduledTaskSubscribersListingRepository extends ListingRepository {
|
||||
public function getGroups(ListingDefinition $definition): array {
|
||||
$queryBuilder = clone $this->queryBuilder;
|
||||
$this->applyFromClause($queryBuilder);
|
||||
$this->applyParameters($queryBuilder, $definition->getParameters());
|
||||
|
||||
// total count
|
||||
$countQueryBuilder = clone $queryBuilder;
|
||||
$countQueryBuilder->select('COUNT(sts.subscriber) AS subscriberCount');
|
||||
$totalCount = intval($countQueryBuilder->getQuery()->getSingleScalarResult());
|
||||
|
||||
// Sent count
|
||||
$sentCountQuery = clone $queryBuilder;
|
||||
$sentCountQuery->select('COUNT(sts.subscriber) AS subscriberCount');
|
||||
$sentCountQuery->andWhere('sts.processed = :processedStatus');
|
||||
$sentCountQuery->andWhere('sts.failed = :failedStatus');
|
||||
$sentCountQuery->setParameter('processedStatus', ScheduledTaskSubscriberEntity::STATUS_PROCESSED);
|
||||
$sentCountQuery->setParameter('failedStatus', ScheduledTaskSubscriberEntity::FAIL_STATUS_OK);
|
||||
$sentCount = intval($sentCountQuery->getQuery()->getSingleScalarResult());
|
||||
|
||||
// Failed count
|
||||
$failedCountQuery = clone $queryBuilder;
|
||||
$failedCountQuery->select('COUNT(sts.subscriber) AS subscriberCount');
|
||||
$failedCountQuery->andWhere('sts.failed = :failedStatus');
|
||||
$failedCountQuery->setParameter('failedStatus', ScheduledTaskSubscriberEntity::FAIL_STATUS_FAILED);
|
||||
$failedCount = intval($failedCountQuery->getQuery()->getSingleScalarResult());
|
||||
|
||||
// Unprocessed count
|
||||
$unprocessedCountQuery = clone $queryBuilder;
|
||||
$unprocessedCountQuery->select('COUNT(sts.subscriber) AS subscriberCount');
|
||||
$unprocessedCountQuery->andWhere('sts.processed = :processedStatus');
|
||||
$unprocessedCountQuery->setParameter('processedStatus', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED);
|
||||
$unprocessedCount = intval($unprocessedCountQuery->getQuery()->getSingleScalarResult());
|
||||
|
||||
return [
|
||||
[
|
||||
'name' => 'all',
|
||||
'label' => __('All', 'mailpoet'),
|
||||
'count' => $totalCount,
|
||||
],
|
||||
[
|
||||
'name' => ScheduledTaskSubscriberEntity::SENDING_STATUS_SENT,
|
||||
'label' => __('Sent', 'mailpoet'),
|
||||
'count' => $sentCount,
|
||||
],
|
||||
[
|
||||
'name' => ScheduledTaskSubscriberEntity::SENDING_STATUS_FAILED,
|
||||
'label' => __('Failed', 'mailpoet'),
|
||||
'count' => $failedCount,
|
||||
],
|
||||
[
|
||||
'name' => ScheduledTaskSubscriberEntity::SENDING_STATUS_UNPROCESSED,
|
||||
'label' => __('Unprocessed', 'mailpoet'),
|
||||
'count' => $unprocessedCount,
|
||||
],
|
||||
];
|
||||
}
|
||||
|
||||
protected function applySelectClause(QueryBuilder $queryBuilder) {
|
||||
$queryBuilder->select("PARTIAL sts.{task,subscriber,processed,failed,error,createdAt,updatedAt}, PARTIAL s.{id, email, firstName, lastName}");
|
||||
}
|
||||
|
||||
protected function applyFromClause(QueryBuilder $queryBuilder) {
|
||||
$queryBuilder->from(ScheduledTaskSubscriberEntity::class, 'sts')
|
||||
->leftJoin('sts.subscriber', 's');
|
||||
}
|
||||
|
||||
protected function applyGroup(QueryBuilder $queryBuilder, string $group) {
|
||||
if ($group === ScheduledTaskSubscriberEntity::SENDING_STATUS_SENT) {
|
||||
$queryBuilder->andWhere('sts.processed = :processedStatus');
|
||||
$queryBuilder->andWhere('sts.failed = :failedStatus');
|
||||
$queryBuilder->setParameter('processedStatus', ScheduledTaskSubscriberEntity::STATUS_PROCESSED);
|
||||
$queryBuilder->setParameter('failedStatus', ScheduledTaskSubscriberEntity::FAIL_STATUS_OK);
|
||||
} elseif ($group === ScheduledTaskSubscriberEntity::SENDING_STATUS_FAILED) {
|
||||
$queryBuilder->andWhere('sts.failed = :failedStatus');
|
||||
$queryBuilder->setParameter('failedStatus', ScheduledTaskSubscriberEntity::FAIL_STATUS_FAILED);
|
||||
} elseif ($group === ScheduledTaskSubscriberEntity::SENDING_STATUS_UNPROCESSED) {
|
||||
$queryBuilder->andWhere('sts.processed = :processedStatus');
|
||||
$queryBuilder->setParameter('processedStatus', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED);
|
||||
}
|
||||
}
|
||||
|
||||
protected function applySorting(QueryBuilder $queryBuilder, string $sortBy, string $sortOrder) {
|
||||
// ScheduledTaskSubscriber doesn't have id column so the default fallback value 'id'
|
||||
// generated in MailPoet\Listing\Handler needs to be changed to something else
|
||||
if ($sortBy === 'id') {
|
||||
$sortBy = 'sts.subscriber';
|
||||
} elseif ($sortBy === 'subscriberId') { // Ordering by subscriberId is mapped to email for consistency with Subscriber listing
|
||||
$sortBy = 's.email';
|
||||
} else {
|
||||
$sortBy = "sts.{$sortBy}";
|
||||
}
|
||||
$queryBuilder->addOrderBy($sortBy, $sortOrder);
|
||||
}
|
||||
|
||||
protected function applySearch(QueryBuilder $queryBuilder, string $search, array $parameters = []) {
|
||||
$search = Helpers::escapeSearch($search);
|
||||
$queryBuilder
|
||||
->andWhere('s.email LIKE :search or s.firstName LIKE :search or s.lastName LIKE :search')
|
||||
->setParameter('search', "%$search%");
|
||||
}
|
||||
|
||||
protected function applyFilters(QueryBuilder $queryBuilder, array $filters) {
|
||||
// the parent class requires this method, but scheduled task subscribers listing doesn't currently support this feature.
|
||||
}
|
||||
|
||||
protected function applyParameters(QueryBuilder $queryBuilder, array $parameters) {
|
||||
if (isset($parameters['task_ids']) && !empty($parameters['task_ids'])) {
|
||||
$queryBuilder->andWhere('sts.task IN (:taskIds)')
|
||||
->setParameter('taskIds', $parameters['task_ids']);
|
||||
}
|
||||
}
|
||||
|
||||
public function getCount(ListingDefinition $definition): int {
|
||||
$queryBuilder = clone $this->queryBuilder;
|
||||
$this->applyFromClause($queryBuilder);
|
||||
$this->applyConstraints($queryBuilder, $definition);
|
||||
$queryBuilder->select("COUNT(DISTINCT sts.subscriber)");
|
||||
return intval($queryBuilder->getQuery()->getSingleScalarResult());
|
||||
}
|
||||
}
|
||||
+229
@@ -0,0 +1,229 @@
|
||||
<?php declare(strict_types = 1);
|
||||
|
||||
namespace MailPoet\Newsletter\Sending;
|
||||
|
||||
if (!defined('ABSPATH')) exit;
|
||||
|
||||
|
||||
use MailPoet\Doctrine\Repository;
|
||||
use MailPoet\Entities\ScheduledTaskEntity;
|
||||
use MailPoet\Entities\ScheduledTaskSubscriberEntity;
|
||||
use MailPoet\Entities\SubscriberEntity;
|
||||
use MailPoet\InvalidStateException;
|
||||
use MailPoetVendor\Carbon\Carbon;
|
||||
use MailPoetVendor\Doctrine\DBAL\ArrayParameterType;
|
||||
use MailPoetVendor\Doctrine\ORM\QueryBuilder;
|
||||
|
||||
/**
|
||||
* @extends Repository<ScheduledTaskSubscriberEntity>
|
||||
*/
|
||||
class ScheduledTaskSubscribersRepository extends Repository {
|
||||
protected function getEntityClassName() {
|
||||
return ScheduledTaskSubscriberEntity::class;
|
||||
}
|
||||
|
||||
public function isSubscriberProcessed(ScheduledTaskEntity $task, SubscriberEntity $subscriber): bool {
|
||||
$scheduledTaskSubscriber = $this
|
||||
->doctrineRepository
|
||||
->createQueryBuilder('sts')
|
||||
->andWhere('sts.processed = 1')
|
||||
->andWhere('sts.task = :task')
|
||||
->andWhere('sts.subscriber = :subscriber')
|
||||
->setParameter('subscriber', $subscriber)
|
||||
->setParameter('task', $task)
|
||||
->getQuery()
|
||||
->getOneOrNullResult();
|
||||
return !empty($scheduledTaskSubscriber);
|
||||
}
|
||||
|
||||
public function createOrUpdate(array $data): ?ScheduledTaskSubscriberEntity {
|
||||
if (!isset($data['task_id'], $data['subscriber_id'])) {
|
||||
return null;
|
||||
}
|
||||
|
||||
$taskSubscriber = $this->findOneBy(['task' => $data['task_id'], 'subscriber' => $data['subscriber_id']]);
|
||||
if (!$taskSubscriber) {
|
||||
$task = $this->entityManager->getReference(ScheduledTaskEntity::class, (int)$data['task_id']);
|
||||
$subscriber = $this->entityManager->getReference(SubscriberEntity::class, (int)$data['subscriber_id']);
|
||||
if (!$task || !$subscriber) throw new InvalidStateException();
|
||||
|
||||
$taskSubscriber = new ScheduledTaskSubscriberEntity($task, $subscriber);
|
||||
$this->persist($taskSubscriber);
|
||||
}
|
||||
|
||||
$processed = $data['processed'] ?? ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED;
|
||||
$failed = $data['failed'] ?? ScheduledTaskSubscriberEntity::FAIL_STATUS_OK;
|
||||
|
||||
$taskSubscriber->setProcessed($processed);
|
||||
$taskSubscriber->setFailed($failed);
|
||||
$this->flush();
|
||||
return $taskSubscriber;
|
||||
}
|
||||
|
||||
public function countSubscriberIdsBatchForTask(int $taskId, int $lastProcessedSubscriberId): int {
|
||||
$queryBuilder = $this->getBaseSubscribersIdsBatchForTaskQuery($taskId, $lastProcessedSubscriberId);
|
||||
$countSubscribers = $queryBuilder
|
||||
->select('count(sts.subscriber)')
|
||||
->getQuery()
|
||||
->getSingleScalarResult();
|
||||
|
||||
return intval($countSubscribers);
|
||||
}
|
||||
|
||||
public function getSubscriberIdsBatchForTask(int $taskId, int $lastProcessedSubscriberId, int $limit): array {
|
||||
$queryBuilder = $this->getBaseSubscribersIdsBatchForTaskQuery($taskId, $lastProcessedSubscriberId);
|
||||
$subscribersIds = $queryBuilder
|
||||
->select('IDENTITY(sts.subscriber) AS subscriber_id')
|
||||
->orderBy('sts.subscriber', 'asc')
|
||||
->setMaxResults($limit)
|
||||
->getQuery()
|
||||
->getSingleColumnResult();
|
||||
|
||||
return $subscribersIds;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int[] $subscriberIds
|
||||
*/
|
||||
public function updateProcessedSubscribers(ScheduledTaskEntity $task, array $subscriberIds): void {
|
||||
if ($subscriberIds) {
|
||||
$this->entityManager->createQueryBuilder()
|
||||
->update(ScheduledTaskSubscriberEntity::class, 'sts')
|
||||
->set('sts.processed', ScheduledTaskSubscriberEntity::STATUS_PROCESSED)
|
||||
->where('sts.subscriber IN (:subscriberIds)')
|
||||
->andWhere('sts.task = :task')
|
||||
->setParameter('subscriberIds', $subscriberIds, ArrayParameterType::INTEGER)
|
||||
->setParameter('task', $task)
|
||||
->getQuery()
|
||||
->execute();
|
||||
|
||||
// update was done via DQL, make sure the entities are also refreshed in the entity manager
|
||||
$this->refreshAll(function (ScheduledTaskSubscriberEntity $entity) use ($task, $subscriberIds) {
|
||||
return $entity->getTask() === $task && in_array($entity->getSubscriberId(), $subscriberIds, true);
|
||||
});
|
||||
}
|
||||
|
||||
$this->checkCompleted($task);
|
||||
}
|
||||
|
||||
public function createSubscribersForBounceWorker(ScheduledTaskEntity $scheduledTaskEntity): void {
|
||||
$scheduledTaskSubscribersTable = $this->entityManager->getClassMetadata(ScheduledTaskSubscriberEntity::class)->getTableName();
|
||||
$subscribersTable = $this->entityManager->getClassMetadata(SubscriberEntity::class)->getTableName();
|
||||
|
||||
$stmt = $this->entityManager->getConnection()->prepare("
|
||||
INSERT IGNORE INTO " . $scheduledTaskSubscribersTable . "
|
||||
(task_id, subscriber_id, processed)
|
||||
SELECT :taskId AS task_id, s.`id` AS subscriber_id, :unprocessed AS processed
|
||||
FROM " . $subscribersTable . " s
|
||||
WHERE s.`deleted_at` IS NULL
|
||||
AND s.`status` IN (:subscribed, :unconfirmed)
|
||||
");
|
||||
$stmt->bindValue('taskId', $scheduledTaskEntity->getId());
|
||||
$stmt->bindValue('unprocessed', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED);
|
||||
$stmt->bindValue('subscribed', SubscriberEntity::STATUS_SUBSCRIBED);
|
||||
$stmt->bindValue('unconfirmed', SubscriberEntity::STATUS_UNCONFIRMED);
|
||||
$stmt->executeQuery();
|
||||
}
|
||||
|
||||
/** @param int[] $ids */
|
||||
public function deleteByTaskIds(array $ids): void {
|
||||
$this->entityManager->createQueryBuilder()
|
||||
->delete(ScheduledTaskSubscriberEntity::class, 'sts')
|
||||
->where('sts.task IN (:taskIds)')
|
||||
->setParameter('taskIds', $ids)
|
||||
->getQuery()
|
||||
->execute();
|
||||
|
||||
// delete was done via DQL, make sure the entities are also detached from the entity manager
|
||||
$this->detachAll(function (ScheduledTaskSubscriberEntity $entity) use ($ids) {
|
||||
$task = $entity->getTask();
|
||||
return $task && in_array($task->getId(), $ids, true);
|
||||
});
|
||||
}
|
||||
|
||||
public function deleteByScheduledTask(ScheduledTaskEntity $scheduledTask): void {
|
||||
$this->entityManager->createQueryBuilder()
|
||||
->delete(ScheduledTaskSubscriberEntity::class, 'sts')
|
||||
->where('sts.task = :task')
|
||||
->setParameter('task', $scheduledTask)
|
||||
->getQuery()
|
||||
->execute();
|
||||
|
||||
// delete was done via DQL, make sure the entities are also detached from the entity manager
|
||||
$this->detachAll(function (ScheduledTaskSubscriberEntity $entity) use ($scheduledTask) {
|
||||
return $entity->getTask() === $scheduledTask;
|
||||
});
|
||||
}
|
||||
|
||||
public function deleteByScheduledTaskAndSubscriberIds(ScheduledTaskEntity $scheduledTask, array $subscriberIds): void {
|
||||
$this->entityManager->createQueryBuilder()
|
||||
->delete(ScheduledTaskSubscriberEntity::class, 'sts')
|
||||
->where('sts.task = :task')
|
||||
->andWhere('sts.subscriber IN (:subscriberIds)')
|
||||
->setParameter('task', $scheduledTask)
|
||||
->setParameter('subscriberIds', $subscriberIds, ArrayParameterType::INTEGER)
|
||||
->getQuery()
|
||||
->execute();
|
||||
|
||||
// delete was done via DQL, make sure the entities are also detached from the entity manager
|
||||
$this->detachAll(function (ScheduledTaskSubscriberEntity $entity) use ($scheduledTask, $subscriberIds) {
|
||||
return $entity->getTask() === $scheduledTask && in_array($entity->getSubscriberId(), $subscriberIds, true);
|
||||
});
|
||||
|
||||
$this->checkCompleted($scheduledTask);
|
||||
}
|
||||
|
||||
public function setSubscribers(ScheduledTaskEntity $task, array $subscriberIds): void {
|
||||
$this->deleteByScheduledTask($task);
|
||||
|
||||
foreach ($subscriberIds as $subscriberId) {
|
||||
$this->createOrUpdate([
|
||||
'task_id' => $task->getId(),
|
||||
'subscriber_id' => $subscriberId,
|
||||
]);
|
||||
}
|
||||
}
|
||||
|
||||
public function saveError(ScheduledTaskEntity $scheduledTask, int $subscriberId, string $errorMessage): void {
|
||||
$scheduledTaskSubscriber = $this->findOneBy(['task' => $scheduledTask, 'subscriber' => $subscriberId]);
|
||||
|
||||
if ($scheduledTaskSubscriber instanceof ScheduledTaskSubscriberEntity) {
|
||||
$scheduledTaskSubscriber->setFailed(ScheduledTaskSubscriberEntity::FAIL_STATUS_FAILED);
|
||||
$scheduledTaskSubscriber->setProcessed(ScheduledTaskSubscriberEntity::STATUS_PROCESSED);
|
||||
$scheduledTaskSubscriber->setError($errorMessage);
|
||||
$this->persist($scheduledTaskSubscriber);
|
||||
$this->flush();
|
||||
|
||||
$this->checkCompleted($scheduledTask);
|
||||
}
|
||||
}
|
||||
|
||||
public function countProcessed(ScheduledTaskEntity $scheduledTaskEntity): int {
|
||||
return $this->countBy(['task' => $scheduledTaskEntity, 'processed' => ScheduledTaskSubscriberEntity::STATUS_PROCESSED]);
|
||||
}
|
||||
|
||||
public function countUnprocessed(ScheduledTaskEntity $scheduledTaskEntity): int {
|
||||
return $this->countBy(['task' => $scheduledTaskEntity, 'processed' => ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED]);
|
||||
}
|
||||
|
||||
private function checkCompleted(ScheduledTaskEntity $task): void {
|
||||
$count = $this->countUnprocessed($task);
|
||||
if ($count === 0) {
|
||||
$task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED);
|
||||
$task->setProcessedAt(Carbon::now()->millisecond(0));
|
||||
$this->entityManager->flush();
|
||||
}
|
||||
}
|
||||
|
||||
private function getBaseSubscribersIdsBatchForTaskQuery(int $taskId, int $lastProcessedSubscriberId): QueryBuilder {
|
||||
return $this->entityManager
|
||||
->createQueryBuilder()
|
||||
->from(ScheduledTaskSubscriberEntity::class, 'sts')
|
||||
->andWhere('sts.task = :taskId')
|
||||
->andWhere('sts.subscriber > :lastProcessedSubscriberId')
|
||||
->andWhere('sts.processed = :status')
|
||||
->setParameter('taskId', $taskId)
|
||||
->setParameter('lastProcessedSubscriberId', $lastProcessedSubscriberId)
|
||||
->setParameter('status', ScheduledTaskSubscriberEntity::STATUS_UNPROCESSED);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,432 @@
|
||||
<?php // phpcs:ignore SlevomatCodingStandard.TypeHints.DeclareStrictTypes.DeclareStrictTypesMissing
|
||||
|
||||
namespace MailPoet\Newsletter\Sending;
|
||||
|
||||
if (!defined('ABSPATH')) exit;
|
||||
|
||||
|
||||
use MailPoet\Cron\Workers\SendingQueue\SendingQueue;
|
||||
use MailPoet\Doctrine\Repository;
|
||||
use MailPoet\Entities\NewsletterEntity;
|
||||
use MailPoet\Entities\ScheduledTaskEntity;
|
||||
use MailPoet\Entities\ScheduledTaskSubscriberEntity;
|
||||
use MailPoet\Entities\SendingQueueEntity;
|
||||
use MailPoet\Entities\SubscriberEntity;
|
||||
use MailPoetVendor\Carbon\Carbon;
|
||||
use MailPoetVendor\Doctrine\DBAL\ArrayParameterType;
|
||||
use MailPoetVendor\Doctrine\ORM\EntityManager;
|
||||
use MailPoetVendor\Doctrine\ORM\Query\Expr\Join;
|
||||
|
||||
/**
|
||||
* @extends Repository<ScheduledTaskEntity>
|
||||
*/
|
||||
class ScheduledTasksRepository extends Repository {
|
||||
const TASK_BATCH_SIZE = 20;
|
||||
const CANCELLABLE_STATUSES = [
|
||||
ScheduledTaskEntity::STATUS_SCHEDULED,
|
||||
ScheduledTaskEntity::VIRTUAL_STATUS_RUNNING,
|
||||
null,
|
||||
];
|
||||
|
||||
private SendingQueuesRepository $sendingQueuesRepository;
|
||||
|
||||
public function __construct(
|
||||
EntityManager $entityManager,
|
||||
SendingQueuesRepository $sendingQueuesRepository
|
||||
) {
|
||||
$this->sendingQueuesRepository = $sendingQueuesRepository;
|
||||
parent::__construct($entityManager);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param NewsletterEntity $newsletter
|
||||
* @return ScheduledTaskEntity[]
|
||||
*/
|
||||
public function findByNewsletterAndStatus(NewsletterEntity $newsletter, string $status): array {
|
||||
return $this->doctrineRepository->createQueryBuilder('st')
|
||||
->select('st')
|
||||
->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task')
|
||||
->andWhere('st.status = :status')
|
||||
->andWhere('sq.newsletter = :newsletter')
|
||||
->setParameter('status', $status)
|
||||
->setParameter('newsletter', $newsletter)
|
||||
->getQuery()
|
||||
->getResult();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param NewsletterEntity $newsletter
|
||||
*/
|
||||
public function findOneByNewsletter(NewsletterEntity $newsletter): ?ScheduledTaskEntity {
|
||||
$scheduledTask = $this->doctrineRepository->createQueryBuilder('st')
|
||||
->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task')
|
||||
->andWhere('sq.newsletter = :newsletter')
|
||||
->orderBy('sq.updatedAt', 'desc')
|
||||
->setMaxResults(1)
|
||||
->setParameter('newsletter', $newsletter)
|
||||
->getQuery()
|
||||
->getOneOrNullResult();
|
||||
// for phpstan because it detects mixed instead of entity
|
||||
return ($scheduledTask instanceof ScheduledTaskEntity) ? $scheduledTask : null;
|
||||
}
|
||||
|
||||
public function findOneBySendingQueue(SendingQueueEntity $sendingQueue): ?ScheduledTaskEntity {
|
||||
$scheduledTask = $this->doctrineRepository->createQueryBuilder('st')
|
||||
->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task')
|
||||
->andWhere('sq.id = :sendingQueue')
|
||||
->setMaxResults(1)
|
||||
->setParameter('sendingQueue', $sendingQueue)
|
||||
->getQuery()
|
||||
->getOneOrNullResult();
|
||||
// for phpstan because it detects mixed instead of entity
|
||||
return ($scheduledTask instanceof ScheduledTaskEntity) ? $scheduledTask : null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param NewsletterEntity $newsletter
|
||||
* @return ScheduledTaskEntity[]
|
||||
*/
|
||||
public function findByScheduledAndRunningForNewsletter(NewsletterEntity $newsletter): array {
|
||||
return $this->doctrineRepository->createQueryBuilder('st')
|
||||
->select('st')
|
||||
->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task')
|
||||
->andWhere('st.status = :status OR st.status IS NULL')
|
||||
->andWhere('sq.newsletter = :newsletter')
|
||||
->setParameter('status', NewsletterEntity::STATUS_SCHEDULED)
|
||||
->setParameter('newsletter', $newsletter)
|
||||
->getQuery()
|
||||
->getResult();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param NewsletterEntity $newsletter
|
||||
* @return ScheduledTaskEntity[]
|
||||
*/
|
||||
public function findByNewsletterAndSubscriberId(NewsletterEntity $newsletter, int $subscriberId): array {
|
||||
return $this->doctrineRepository->createQueryBuilder('st')
|
||||
->select('st')
|
||||
->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task')
|
||||
->join(ScheduledTaskSubscriberEntity::class, 'sts', Join::WITH, 'st = sts.task')
|
||||
->andWhere('sq.newsletter = :newsletter')
|
||||
->andWhere('sts.subscriber = :subscriber')
|
||||
->setParameter('newsletter', $newsletter)
|
||||
->setParameter('subscriber', $subscriberId)
|
||||
->getQuery()
|
||||
->getResult();
|
||||
}
|
||||
|
||||
public function findOneScheduledByNewsletterAndSubscriber(NewsletterEntity $newsletter, SubscriberEntity $subscriber): ?ScheduledTaskEntity {
|
||||
$scheduledTask = $this->doctrineRepository->createQueryBuilder('st')
|
||||
->join(SendingQueueEntity::class, 'sq', Join::WITH, 'st = sq.task')
|
||||
->join(ScheduledTaskSubscriberEntity::class, 'sts', Join::WITH, 'st = sts.task')
|
||||
->andWhere('st.status = :status')
|
||||
->andWhere('sq.newsletter = :newsletter')
|
||||
->andWhere('sts.subscriber = :subscriber')
|
||||
->setMaxResults(1)
|
||||
->setParameter('status', ScheduledTaskEntity::STATUS_SCHEDULED)
|
||||
->setParameter('newsletter', $newsletter)
|
||||
->setParameter('subscriber', $subscriber)
|
||||
->getQuery()
|
||||
->getOneOrNullResult();
|
||||
// for phpstan because it detects mixed instead of entity
|
||||
return ($scheduledTask instanceof ScheduledTaskEntity) ? $scheduledTask : null;
|
||||
}
|
||||
|
||||
public function findScheduledOrRunningTask(?string $type): ?ScheduledTaskEntity {
|
||||
$queryBuilder = $this->doctrineRepository->createQueryBuilder('st')
|
||||
->select('st')
|
||||
->where('((st.status = :scheduledStatus) OR (st.status is NULL))')
|
||||
->andWhere('st.deletedAt IS NULL')
|
||||
->setParameter('scheduledStatus', ScheduledTaskEntity::STATUS_SCHEDULED)
|
||||
->setMaxResults(1)
|
||||
->orderBy('st.scheduledAt', 'DESC');
|
||||
if (!empty($type)) {
|
||||
$queryBuilder
|
||||
->andWhere('st.type = :type')
|
||||
->setParameter('type', $type);
|
||||
}
|
||||
return $queryBuilder->getQuery()->getOneOrNullResult();
|
||||
}
|
||||
|
||||
public function findScheduledTask(?string $type): ?ScheduledTaskEntity {
|
||||
$queryBuilder = $this->doctrineRepository->createQueryBuilder('st')
|
||||
->select('st')
|
||||
->where('st.status = :scheduledStatus')
|
||||
->andWhere('st.deletedAt IS NULL')
|
||||
->setParameter('scheduledStatus', ScheduledTaskEntity::STATUS_SCHEDULED)
|
||||
->setMaxResults(1)
|
||||
->orderBy('st.scheduledAt', 'DESC');
|
||||
if (!empty($type)) {
|
||||
$queryBuilder
|
||||
->andWhere('st.type = :type')
|
||||
->setParameter('type', $type);
|
||||
}
|
||||
return $queryBuilder->getQuery()->getOneOrNullResult();
|
||||
}
|
||||
|
||||
public function findPreviousTask(ScheduledTaskEntity $task): ?ScheduledTaskEntity {
|
||||
return $this->doctrineRepository->createQueryBuilder('st')
|
||||
->select('st')
|
||||
->where('st.type = :type')
|
||||
->setParameter('type', $task->getType())
|
||||
->andWhere('st.createdAt < :created')
|
||||
->setParameter('created', $task->getCreatedAt())
|
||||
->orderBy('st.scheduledAt', 'DESC')
|
||||
->setMaxResults(1)
|
||||
->getQuery()
|
||||
->getOneOrNullResult();
|
||||
}
|
||||
|
||||
public function findDueByType($type, $limit = null) {
|
||||
return $this->findByTypeAndStatus($type, ScheduledTaskEntity::STATUS_SCHEDULED, $limit);
|
||||
}
|
||||
|
||||
public function findRunningByType($type, $limit = null) {
|
||||
return $this->findByTypeAndStatus($type, null, $limit);
|
||||
}
|
||||
|
||||
public function findCompletedByType($type, $limit = null) {
|
||||
return $this->findByTypeAndStatus($type, ScheduledTaskEntity::STATUS_COMPLETED, $limit);
|
||||
}
|
||||
|
||||
public function findFutureScheduledByType($type, $limit = null) {
|
||||
return $this->findByTypeAndStatus($type, ScheduledTaskEntity::STATUS_SCHEDULED, $limit, true);
|
||||
}
|
||||
|
||||
public function getCountsPerStatus(string $type = 'sending') {
|
||||
$stats = [
|
||||
ScheduledTaskEntity::STATUS_COMPLETED => 0,
|
||||
ScheduledTaskEntity::STATUS_PAUSED => 0,
|
||||
ScheduledTaskEntity::STATUS_SCHEDULED => 0,
|
||||
ScheduledTaskEntity::STATUS_CANCELLED => 0,
|
||||
ScheduledTaskEntity::VIRTUAL_STATUS_RUNNING => 0,
|
||||
];
|
||||
|
||||
$counts = $this->doctrineRepository->createQueryBuilder('st')
|
||||
->select('COUNT(st.id) as value')
|
||||
->addSelect('st.status')
|
||||
->where('st.deletedAt IS NULL')
|
||||
->andWhere('st.type = :type')
|
||||
->setParameter('type', $type)
|
||||
->addGroupBy('st.status')
|
||||
->getQuery()
|
||||
->getResult();
|
||||
|
||||
foreach ($counts as $count) {
|
||||
if ($count['status'] === null) {
|
||||
$stats[ScheduledTaskEntity::VIRTUAL_STATUS_RUNNING] = (int)$count['value'];
|
||||
continue;
|
||||
}
|
||||
$stats[$count['status']] = (int)$count['value'];
|
||||
}
|
||||
return $stats;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string|null $type
|
||||
* @param array $statuses
|
||||
* @param int $limit
|
||||
* @return array<ScheduledTaskEntity>
|
||||
*/
|
||||
public function getLatestTasks(
|
||||
$type = null,
|
||||
$statuses = [
|
||||
ScheduledTaskEntity::STATUS_COMPLETED,
|
||||
ScheduledTaskEntity::STATUS_CANCELLED,
|
||||
ScheduledTaskEntity::STATUS_SCHEDULED,
|
||||
ScheduledTaskEntity::STATUS_PAUSED,
|
||||
ScheduledTaskEntity::VIRTUAL_STATUS_RUNNING,
|
||||
],
|
||||
$limit = self::TASK_BATCH_SIZE
|
||||
) {
|
||||
$result = [];
|
||||
foreach ($statuses as $status) {
|
||||
$tasksQuery = $this->doctrineRepository->createQueryBuilder('st')
|
||||
->select('st')
|
||||
->where('st.deletedAt IS NULL');
|
||||
|
||||
if ($status === ScheduledTaskEntity::VIRTUAL_STATUS_RUNNING) {
|
||||
$tasksQuery = $tasksQuery->andWhere('st.status = :status OR st.status IS NULL');
|
||||
} else {
|
||||
$tasksQuery = $tasksQuery->andWhere('st.status = :status');
|
||||
}
|
||||
|
||||
if ($type) {
|
||||
$tasksQuery = $tasksQuery->andWhere('st.type = :type')
|
||||
->setParameter('type', $type);
|
||||
}
|
||||
|
||||
$tasks = $tasksQuery
|
||||
->setParameter('status', $status)
|
||||
->setMaxResults($limit)
|
||||
->orderBy('st.id', 'desc')
|
||||
->getQuery()
|
||||
->getResult();
|
||||
$result = array_merge($result, $tasks);
|
||||
}
|
||||
|
||||
return $result;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return ScheduledTaskEntity[]
|
||||
*/
|
||||
public function findRunningSendingTasks(?int $limit = null): array {
|
||||
return $this->doctrineRepository->createQueryBuilder('st')
|
||||
->select('st')
|
||||
->join('st.sendingQueue', 'sq')
|
||||
->where('st.type = :type')
|
||||
->andWhere('st.status IS NULL')
|
||||
->andWhere('st.deletedAt IS NULL')
|
||||
->orderBy('st.priority', 'ASC')
|
||||
->addOrderBy('st.updatedAt', 'ASC')
|
||||
->setMaxResults($limit)
|
||||
->setParameter('type', SendingQueue::TASK_TYPE)
|
||||
->getQuery()
|
||||
->getResult();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $type
|
||||
* @param SubscriberEntity $subscriber
|
||||
* @return ScheduledTaskEntity[]
|
||||
* @throws \MailPoetVendor\Doctrine\ORM\NonUniqueResultException
|
||||
*/
|
||||
public function findByTypeAndSubscriber(string $type, SubscriberEntity $subscriber): array {
|
||||
$query = $this->doctrineRepository->createQueryBuilder('st')
|
||||
->select('st')
|
||||
->join(ScheduledTaskSubscriberEntity::class, 'sts', Join::WITH, 'st = sts.task')
|
||||
->where('st.type = :type')
|
||||
->andWhere('sts.subscriber = :subscriber')
|
||||
->andWhere('st.deletedAt IS NULL')
|
||||
->andWhere('st.status = :status')
|
||||
->setParameter('type', $type)
|
||||
->setParameter('subscriber', $subscriber->getId())
|
||||
->setParameter('status', ScheduledTaskEntity::STATUS_SCHEDULED)
|
||||
->getQuery();
|
||||
$tasks = $query->getResult();
|
||||
return $tasks;
|
||||
}
|
||||
|
||||
public function touchAllByIds(array $ids): void {
|
||||
$now = Carbon::now()->millisecond(0);
|
||||
$this->entityManager->createQueryBuilder()
|
||||
->update(ScheduledTaskEntity::class, 'st')
|
||||
->set('st.updatedAt', ':updatedAt')
|
||||
->setParameter('updatedAt', $now)
|
||||
->where('st.id IN (:ids)')
|
||||
->setParameter('ids', $ids, ArrayParameterType::INTEGER)
|
||||
->getQuery()
|
||||
->execute();
|
||||
|
||||
// update was done via DQL, make sure the entities are also refreshed in the entity manager
|
||||
$this->refreshAll(function (ScheduledTaskEntity $entity) use ($ids) {
|
||||
return in_array($entity->getId(), $ids, true);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @return ScheduledTaskEntity[]
|
||||
*/
|
||||
public function findScheduledSendingTasks(?int $limit = null): array {
|
||||
$now = Carbon::now()->millisecond(0);
|
||||
return $this->doctrineRepository->createQueryBuilder('st')
|
||||
->select('st')
|
||||
->join('st.sendingQueue', 'sq')
|
||||
->where('st.deletedAt IS NULL')
|
||||
->andWhere('st.status = :status')
|
||||
->andWhere('st.scheduledAt <= :now')
|
||||
->andWhere('st.type = :type')
|
||||
->orderBy('st.updatedAt', 'ASC')
|
||||
->setMaxResults($limit)
|
||||
->setParameter('status', ScheduledTaskEntity::STATUS_SCHEDULED)
|
||||
->setParameter('now', $now)
|
||||
->setParameter('type', SendingQueue::TASK_TYPE)
|
||||
->getQuery()
|
||||
->getResult();
|
||||
}
|
||||
|
||||
public function invalidateTask(ScheduledTaskEntity $task): void {
|
||||
$task->setStatus(ScheduledTaskEntity::STATUS_INVALID);
|
||||
$this->persist($task);
|
||||
$this->flush();
|
||||
}
|
||||
|
||||
public function cancelTask(ScheduledTaskEntity $task): void {
|
||||
if (!in_array($task->getStatus(), self::CANCELLABLE_STATUSES)) {
|
||||
throw new \Exception(__('Only scheduled and running tasks can be cancelled', 'mailpoet'), 400);
|
||||
}
|
||||
$task->setStatus(ScheduledTaskEntity::STATUS_CANCELLED);
|
||||
$task->setCancelledAt(Carbon::now()->millisecond(0));
|
||||
$this->persist($task);
|
||||
$this->flush();
|
||||
}
|
||||
|
||||
public function rescheduleTask(ScheduledTaskEntity $task): void {
|
||||
if ($task->getStatus() !== ScheduledTaskEntity::STATUS_CANCELLED) {
|
||||
throw new \Exception(__('Only cancelled tasks can be rescheduled', 'mailpoet'), 400);
|
||||
}
|
||||
if ($task->getScheduledAt() <= Carbon::now()->millisecond(0)) {
|
||||
$task->setStatus(ScheduledTaskEntity::VIRTUAL_STATUS_RUNNING);
|
||||
$queue = $task->getSendingQueue();
|
||||
if ($queue) {
|
||||
$this->sendingQueuesRepository->resume($queue);
|
||||
}
|
||||
} else {
|
||||
$task->setStatus(ScheduledTaskEntity::STATUS_SCHEDULED);
|
||||
}
|
||||
$task->setCancelledAt(null);
|
||||
$this->persist($task);
|
||||
$this->flush();
|
||||
}
|
||||
|
||||
/** @param int[] $ids */
|
||||
public function deleteByIds(array $ids): void {
|
||||
$this->entityManager->createQueryBuilder()
|
||||
->delete(ScheduledTaskEntity::class, 't')
|
||||
->where('t.id IN (:ids)')
|
||||
->setParameter('ids', $ids)
|
||||
->getQuery()
|
||||
->execute();
|
||||
|
||||
// delete was done via DQL, make sure the entities are also detached from the entity manager
|
||||
$this->detachAll(function (ScheduledTaskEntity $entity) use ($ids) {
|
||||
return in_array($entity->getId(), $ids, true);
|
||||
});
|
||||
}
|
||||
|
||||
protected function findByTypeAndStatus($type, $status, $limit = null, $future = false) {
|
||||
$queryBuilder = $this->doctrineRepository->createQueryBuilder('st')
|
||||
->select('st')
|
||||
->where('st.type = :type')
|
||||
->setParameter('type', $type)
|
||||
->andWhere('st.deletedAt IS NULL');
|
||||
|
||||
if (is_null($status)) {
|
||||
$queryBuilder->andWhere('st.status IS NULL');
|
||||
} else {
|
||||
$queryBuilder
|
||||
->andWhere('st.status = :status')
|
||||
->setParameter('status', $status);
|
||||
}
|
||||
|
||||
if ($future) {
|
||||
$queryBuilder->andWhere('st.scheduledAt > :now');
|
||||
} else {
|
||||
$queryBuilder->andWhere('st.scheduledAt <= :now');
|
||||
}
|
||||
|
||||
$now = Carbon::now()->millisecond(0);
|
||||
$queryBuilder->setParameter('now', $now);
|
||||
|
||||
if ($limit) {
|
||||
$queryBuilder->setMaxResults($limit);
|
||||
}
|
||||
|
||||
return $queryBuilder->getQuery()->getResult();
|
||||
}
|
||||
|
||||
protected function getEntityClassName() {
|
||||
return ScheduledTaskEntity::class;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,292 @@
|
||||
<?php declare(strict_types = 1);
|
||||
|
||||
namespace MailPoet\Newsletter\Sending;
|
||||
|
||||
if (!defined('ABSPATH')) exit;
|
||||
|
||||
|
||||
use MailPoet\Doctrine\Repository;
|
||||
use MailPoet\Entities\DynamicSegmentFilterEntity;
|
||||
use MailPoet\Entities\NewsletterEntity;
|
||||
use MailPoet\Entities\ScheduledTaskEntity;
|
||||
use MailPoet\Entities\SegmentEntity;
|
||||
use MailPoet\Entities\SendingQueueEntity;
|
||||
use MailPoet\Entities\SubscriberEntity;
|
||||
use MailPoet\Logging\LoggerFactory;
|
||||
use MailPoet\Segments\DynamicSegments\FilterFactory;
|
||||
use MailPoetVendor\Carbon\Carbon;
|
||||
use MailPoetVendor\Doctrine\ORM\EntityManager;
|
||||
|
||||
/**
|
||||
* @extends Repository<SendingQueueEntity>
|
||||
*/
|
||||
class SendingQueuesRepository extends Repository {
|
||||
/** @var ScheduledTaskSubscribersRepository */
|
||||
private $scheduledTaskSubscribersRepository;
|
||||
|
||||
/** @var FilterFactory */
|
||||
private $filterFactory;
|
||||
|
||||
/** @var LoggerFactory */
|
||||
private $loggerFactory;
|
||||
|
||||
public function __construct(
|
||||
EntityManager $entityManager,
|
||||
ScheduledTaskSubscribersRepository $scheduledTaskSubscribersRepository,
|
||||
FilterFactory $filterFactory,
|
||||
LoggerFactory $loggerFactory
|
||||
) {
|
||||
parent::__construct($entityManager);
|
||||
$this->scheduledTaskSubscribersRepository = $scheduledTaskSubscribersRepository;
|
||||
$this->filterFactory = $filterFactory;
|
||||
$this->loggerFactory = $loggerFactory;
|
||||
}
|
||||
|
||||
protected function getEntityClassName() {
|
||||
return SendingQueueEntity::class;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param NewsletterEntity $newsletter
|
||||
* @param string|null $status
|
||||
* @return SendingQueueEntity|null
|
||||
* @throws \MailPoetVendor\Doctrine\ORM\NonUniqueResultException
|
||||
*/
|
||||
public function findOneByNewsletterAndTaskStatus(NewsletterEntity $newsletter, $status): ?SendingQueueEntity {
|
||||
$queryBuilder = $this->entityManager->createQueryBuilder()
|
||||
->select('s')
|
||||
->from(SendingQueueEntity::class, 's')
|
||||
->join('s.task', 't')
|
||||
->andWhere('s.newsletter = :newsletter')
|
||||
->setParameter('newsletter', $newsletter);
|
||||
|
||||
if (is_null($status)) {
|
||||
$queryBuilder->andWhere('t.status IS NULL');
|
||||
} else {
|
||||
$queryBuilder->andWhere('t.status = :status')
|
||||
->setParameter('status', $status);
|
||||
}
|
||||
|
||||
return $queryBuilder->getQuery()->getOneOrNullResult();
|
||||
}
|
||||
|
||||
public function countAllToProcessByNewsletter(NewsletterEntity $newsletter): int {
|
||||
return intval($this->entityManager->createQueryBuilder()
|
||||
->select('sum(s.countToProcess)')
|
||||
->from(SendingQueueEntity::class, 's')
|
||||
->andWhere('s.newsletter = :newsletter')
|
||||
->setParameter('newsletter', $newsletter)
|
||||
->getQuery()
|
||||
->getSingleScalarResult());
|
||||
}
|
||||
|
||||
public function getTaskIdsByNewsletterId(int $newsletterId): array {
|
||||
$results = $this->entityManager->createQueryBuilder()
|
||||
->select('IDENTITY(s.task) as task_id')
|
||||
->from(SendingQueueEntity::class, 's')
|
||||
->andWhere('s.newsletter = :newsletter')
|
||||
->setParameter('newsletter', $newsletterId)
|
||||
->getQuery()
|
||||
->getArrayResult();
|
||||
return array_map('intval', array_column($results, 'task_id'));
|
||||
}
|
||||
|
||||
public function isSubscriberProcessed(SendingQueueEntity $queue, SubscriberEntity $subscriber): bool {
|
||||
$task = $queue->getTask();
|
||||
if (is_null($task)) return false;
|
||||
return $this->scheduledTaskSubscribersRepository->isSubscriberProcessed($task, $subscriber);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return SendingQueueEntity[]
|
||||
*/
|
||||
public function findAllForSubscriberSentBetween(
|
||||
SubscriberEntity $subscriber,
|
||||
?\DateTimeInterface $dateTo,
|
||||
?\DateTimeInterface $dateFrom
|
||||
): array {
|
||||
$qb = $this->entityManager->createQueryBuilder()
|
||||
->select('s, n')
|
||||
->from(SendingQueueEntity::class, 's')
|
||||
->join('s.task', 't')
|
||||
->join('t.subscribers', 'tsub')
|
||||
->join('s.newsletter', 'n')
|
||||
->where('t.status = :status')
|
||||
->setParameter('status', ScheduledTaskEntity::STATUS_COMPLETED)
|
||||
->andWhere('t.type = :sendingType')
|
||||
->setParameter('sendingType', 'sending')
|
||||
->andWhere('tsub.subscriber = :subscriber')
|
||||
->setParameter('subscriber', $subscriber);
|
||||
if ($dateTo) {
|
||||
$qb->andWhere('t.updatedAt < :dateTo')
|
||||
->setParameter('dateTo', $dateTo);
|
||||
}
|
||||
if ($dateFrom) {
|
||||
$qb->andWhere('t.updatedAt > :dateFrom')
|
||||
->setParameter('dateFrom', $dateFrom);
|
||||
}
|
||||
return $qb->getQuery()->getResult();
|
||||
}
|
||||
|
||||
public function getCampaignAnalyticsQuery() {
|
||||
$sevenDaysAgo = Carbon::now()->subDays(7);
|
||||
$thirtyDaysAgo = Carbon::now()->subDays(30);
|
||||
$threeMonthsAgo = Carbon::now()->subMonths(3);
|
||||
|
||||
return $this->doctrineRepository->createQueryBuilder('q')
|
||||
->select('
|
||||
n.type as newsletterType,
|
||||
q.meta as sendingQueueMeta,
|
||||
CASE
|
||||
WHEN COUNT(s.id) > 0 THEN true
|
||||
ELSE false
|
||||
END as sentToSegment,
|
||||
CASE
|
||||
WHEN t.processedAt >= :sevenDaysAgo THEN true
|
||||
ELSE false
|
||||
END as sentLast7Days,
|
||||
CASE
|
||||
WHEN t.processedAt >= :thirtyDaysAgo THEN true
|
||||
ELSE false
|
||||
END as sentLast30Days,
|
||||
CASE
|
||||
WHEN t.processedAt >= :threeMonthsAgo THEN true
|
||||
ELSE false
|
||||
END as sentLast3Months')
|
||||
->join('q.task', 't')
|
||||
->leftJoin('q.newsletter', 'n')
|
||||
->leftJoin('n.newsletterSegments', 'ns')
|
||||
->leftJoin('ns.segment', 's', 'WITH', 's.type = :dynamicType')
|
||||
->andWhere('t.status = :taskStatus')
|
||||
->andWhere('t.processedAt >= :since')
|
||||
->setParameter('sevenDaysAgo', $sevenDaysAgo)
|
||||
->setParameter('thirtyDaysAgo', $thirtyDaysAgo)
|
||||
->setParameter('threeMonthsAgo', $threeMonthsAgo)
|
||||
->setParameter('dynamicType', SegmentEntity::TYPE_DYNAMIC)
|
||||
->setParameter('taskStatus', ScheduledTaskEntity::STATUS_COMPLETED)
|
||||
->setParameter('since', $threeMonthsAgo)
|
||||
->groupBy('q.id')
|
||||
->getQuery();
|
||||
}
|
||||
|
||||
public function pause(SendingQueueEntity $queue): void {
|
||||
if ($queue->getCountProcessed() !== $queue->getCountTotal()) {
|
||||
$task = $queue->getTask();
|
||||
if ($task instanceof ScheduledTaskEntity) {
|
||||
$task->setStatus(ScheduledTaskEntity::STATUS_PAUSED);
|
||||
$this->flush();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public function resume(SendingQueueEntity $queue): void {
|
||||
$task = $queue->getTask();
|
||||
if (!$task instanceof ScheduledTaskEntity) return;
|
||||
|
||||
if ($queue->getCountProcessed() === $queue->getCountTotal()) {
|
||||
$processedAt = Carbon::now()->millisecond(0);
|
||||
$task->setProcessedAt($processedAt);
|
||||
$task->setStatus(ScheduledTaskEntity::STATUS_COMPLETED);
|
||||
// Update also status of newsletter if necessary
|
||||
$newsletter = $queue->getNewsletter();
|
||||
if ($newsletter instanceof NewsletterEntity && $newsletter->canBeSetSent()) {
|
||||
$newsletter->setStatus(NewsletterEntity::STATUS_SENT);
|
||||
}
|
||||
$this->flush();
|
||||
} else {
|
||||
$newsletter = $queue->getNewsletter();
|
||||
if (!$newsletter instanceof NewsletterEntity) return;
|
||||
if ($newsletter->getStatus() === NewsletterEntity::STATUS_CORRUPT) { // force a re-render
|
||||
$queue->setNewsletterRenderedBody(null);
|
||||
$this->persist($queue);
|
||||
}
|
||||
$newsletter->setStatus($newsletter->canBeSetActive() ? NewsletterEntity::STATUS_ACTIVE : NewsletterEntity::STATUS_SENDING);
|
||||
$task->setStatus(null);
|
||||
$this->flush();
|
||||
}
|
||||
}
|
||||
|
||||
public function deleteByTask(ScheduledTaskEntity $scheduledTask): void {
|
||||
$this->entityManager->createQueryBuilder()
|
||||
->delete(SendingQueueEntity::class, 'sq')
|
||||
->where('sq.task = :task')
|
||||
->setParameter('task', $scheduledTask)
|
||||
->getQuery()
|
||||
->execute();
|
||||
|
||||
// delete was done via DQL, make sure the entities are also detached from the entity manager
|
||||
$this->detachAll(function (SendingQueueEntity $entity) use ($scheduledTask) {
|
||||
return $entity->getTask() === $scheduledTask;
|
||||
});
|
||||
}
|
||||
|
||||
public function saveCampaignId(SendingQueueEntity $queue, string $campaignId): void {
|
||||
$meta = $queue->getMeta();
|
||||
if (!is_array($meta)) {
|
||||
$meta = [];
|
||||
}
|
||||
$meta['campaignId'] = $campaignId;
|
||||
$queue->setMeta($meta);
|
||||
$this->flush();
|
||||
}
|
||||
|
||||
public function saveFilterSegmentMeta(SendingQueueEntity $queue, SegmentEntity $filterSegmentEntity): void {
|
||||
$meta = $queue->getMeta() ?? [];
|
||||
$meta['filterSegment'] = [
|
||||
'id' => $filterSegmentEntity->getId(),
|
||||
'name' => $filterSegmentEntity->getName(),
|
||||
'updatedAt' => $filterSegmentEntity->getUpdatedAt(),
|
||||
'filters' => array_map(function(DynamicSegmentFilterEntity $filterEntity) {
|
||||
$filter = $this->filterFactory->getFilterForFilterEntity($filterEntity);
|
||||
$data = $filterEntity->getFilterData();
|
||||
$filterData = [
|
||||
'filterType' => $data->getFilterType(),
|
||||
'action' => $data->getAction(),
|
||||
'data' => $filterEntity->getFilterData()->getData(),
|
||||
'lookupData' => [],
|
||||
];
|
||||
try {
|
||||
$filterData['lookupData'] = $filter->getLookupData($data);
|
||||
} catch (\Throwable $e) {
|
||||
$this->loggerFactory->getLogger(LoggerFactory::TOPIC_SEGMENTS)->error("Failed to save lookup data for filter {$filterEntity->getId()}: {$e->getMessage()}");
|
||||
}
|
||||
return $filterData;
|
||||
}, $filterSegmentEntity->getDynamicFilters()->toArray()),
|
||||
];
|
||||
$queue->setMeta($meta);
|
||||
$this->flush();
|
||||
}
|
||||
|
||||
public function updateCounts(SendingQueueEntity $queue, ?int $count = null): void {
|
||||
if ($count) {
|
||||
// increment/decrement counts based on known subscriber count, don't exceed the bounds
|
||||
$queue->setCountProcessed(min($queue->getCountProcessed() + $count, $queue->getCountTotal()));
|
||||
$queue->setCountToProcess(max($queue->getCountToProcess() - $count, 0));
|
||||
} else {
|
||||
// query DB to update counts, slower but more accurate, to be used if count isn't known
|
||||
$task = $queue->getTask();
|
||||
$processed = $task ? $this->scheduledTaskSubscribersRepository->countProcessed($task) : 0;
|
||||
$unprocessed = $task ? $this->scheduledTaskSubscribersRepository->countUnprocessed($task) : 0;
|
||||
$queue->setCountProcessed($processed);
|
||||
$queue->setCountToProcess($unprocessed);
|
||||
$queue->setCountTotal($processed + $unprocessed);
|
||||
}
|
||||
$this->entityManager->flush();
|
||||
}
|
||||
|
||||
/** @param int[] $ids */
|
||||
public function deleteByNewsletterIds(array $ids): void {
|
||||
$this->entityManager->createQueryBuilder()
|
||||
->delete(SendingQueueEntity::class, 'q')
|
||||
->where('q.newsletter IN (:ids)')
|
||||
->setParameter('ids', $ids)
|
||||
->getQuery()
|
||||
->execute();
|
||||
|
||||
// delete was done via DQL, make sure the entities are also detached from the entity manager
|
||||
$this->detachAll(function (SendingQueueEntity $entity) use ($ids) {
|
||||
$newsletter = $entity->getNewsletter();
|
||||
return $newsletter && in_array($newsletter->getId(), $ids, true);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1 @@
|
||||
<?php
|
||||
Reference in New Issue
Block a user