Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V4.1.x #40

Merged
merged 3 commits into from
Sep 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions src/Command/Command.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

use Kcloze\Jobs\Config;
use Kcloze\Jobs\Logs;
use Kcloze\Jobs\Utils;
use Symfony\Component\Console\Command\Command as SCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
Expand Down Expand Up @@ -85,8 +86,9 @@ protected function sendSignal($signal=SIGUSR1)
$this->logger->log($signal . (SIGUSR1 == $signal) ? ' smooth to exit...' : ' force to exit...');

if (isset($this->config['pidPath']) && !empty($this->config['pidPath'])) {
$masterPidFile=$this->config['pidPath'] . '/master.pid';
$pidStatusFile=$this->config['pidPath'] . '/status.info';
$this->config['pidPath']=$this->config['pidPath'] . '/' . Utils::getHostName();
$masterPidFile =$this->config['pidPath'] . '/master.pid';
$pidStatusFile =$this->config['pidPath'] . '/status.info';
} else {
echo 'config pidPath must be set!' . PHP_EOL;

Expand Down Expand Up @@ -177,10 +179,10 @@ protected function sendSignalHttpServer($signal=SIGTERM)
}

private function checkSwooleSetting()
{
if(version_compare(swoole_version(), '4.0.0', '>=') && 'Off'!==ini_get('swoole.enable_coroutine')){
$this->output->writeln("swoole version >=4.0.0,you have to disable coroutine in php.ini");
$this->output->writeln("details jump to: https://github.com/swoole/swoole-src/issues/2716");
{
if (version_compare(swoole_version(), '4.0.0', '>=') && 'Off' !== ini_get('swoole.enable_coroutine')) {
$this->output->writeln('swoole version >=4.0.0,you have to disable coroutine in php.ini');
$this->output->writeln('details jump to: https://github.com/swoole/swoole-src/issues/2716');
exit;
}
}
Expand Down
11 changes: 5 additions & 6 deletions src/Jobs.php
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public function run($topic='')
if (true === $autoAckBeforeJobStart) {
$this->queue->ack();
}
if (!empty($data) && (is_object($data) || is_array($data))) {
if (!empty($data) && (\is_object($data) || \is_array($data))) {
$beginTime=microtime(true);
// 根据自己的业务需求改写此方法
$jobObject = $this->loadObject($data);
Expand All @@ -105,19 +105,18 @@ public function run($topic='')
//进程安全退出
exit;
}

unset($jobObject, $baseAction);
} else {
$this->logger->log('pop error data: ' . print_r($data, true), 'error', 'error');
}
//防止内存泄漏,每次执行一个job就退出[极端情况才需要开启]
if (isset($this->config['eachJobExit']) && true == $this->config['eachJobExit']) {
$this->logger->log('Each Job Exit' . PHP_EOL);
$this->logger->log('Each Job Exit, job id: ' . $jobObject->uuid . PHP_EOL);
exit;
}
// if ($this->queue->len($topic) <= 0) {
// break;
// }
unset($jobObject, $baseAction);
} while ($this->popNum <= $this->maxPopNum);
} else {
sleep($this->sleep);
Expand Down Expand Up @@ -202,9 +201,9 @@ private function loadFrameworkAction()
//实例化job对象
private function loadObject($data)
{
if (is_object($data)) {
if (\is_object($data)) {
return new JobObject($data->topic ?? '', $data->jobClass ?? '', $data->jobMethod ?? '', $data->jobParams ?? [], $data->jobExtras ?? [], $data->uuid ?? '');
} elseif (is_array($data)) {
} elseif (\is_array($data)) {
return new JobObject($data['topic'] ?? '', $data['jobClass'] ?? '', $data['jobMethod'] ?? '', $data['jobParams'] ?? [], $data['jobExtras'] ?? [], $data['uuid'] ?? '');
}

Expand Down
38 changes: 20 additions & 18 deletions src/Process.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* This file is part of PHP CS Fixer.
* This file is part of Swoole-jobs
* (c) kcloze <[email protected]>
* This source file is subject to the MIT license that is bundled
* with this source code in the file LICENSE.
Expand Down Expand Up @@ -65,6 +65,8 @@ public function __construct()
$this->status=self::STATUS_RUNNING;

if (isset($this->config['pidPath']) && !empty($this->config['pidPath'])) {
//兼容docker部署多个容器共用一个数据目录的问题
$this->config['pidPath']=$this->config['pidPath'] . '/' . Utils::getHostName();
Utils::mkdir($this->config['pidPath']);
$this->pidFile =$this->config['pidPath'] . '/' . $this->pidFile;
$this->pidInfoFile =$this->config['pidPath'] . '/' . $this->pidInfoFile;
Expand Down Expand Up @@ -97,7 +99,7 @@ public function __construct()
$data['status']=$this->status;
$this->saveMasterData($data);
//主进程禁用协程
$this->disableCoroutine();
//$this->disableCoroutine();
$this->setProcessName(self::APP_NAME . ' master ' . $this->ppid . $this->processName);
}

Expand All @@ -112,7 +114,7 @@ public function start()
if (isset($topic['workerMinNum']) && isset($topic['name'])) {
//每个topic开启最少个进程消费队列
for ($i = 0; $i < $topic['workerMinNum']; ++$i) {
$this->reserveQueue($i, $topic['name'], \Kcloze\Jobs\Process::CHILD_PROCESS_CAN_RESTART);
$this->reserveQueue($i, $topic['name'], self::CHILD_PROCESS_CAN_RESTART);
}
}
}
Expand Down Expand Up @@ -198,11 +200,11 @@ public function registSignal()
$topic = $this->workersInfo[$pid]['topic'] ?? '';
$this->status=$this->getMasterData('status');
$topicCanNotRestartNum = $this->dynamicWorkerNum[$topic] ?? 'null';
$this->logger->log(Process::CHILD_PROCESS_CAN_RESTART . '---' . $topic . '***' . $topicCanNotRestartNum . '***' . $this->status . '***' . $this->workersInfo[$pid]['type'] . '***' . $pid, 'info', $this->logSaveFileWorker);
$this->logger->log($pid . ',' . $this->status . ',' . Process::STATUS_RUNNING . ',' . $this->workersInfo[$pid]['type'] . ',' . Process::CHILD_PROCESS_CAN_RESTART, 'info', $this->logSaveFileWorker);
$this->logger->log(self::CHILD_PROCESS_CAN_RESTART . '---' . $topic . '***' . $topicCanNotRestartNum . '***' . $this->status . '***' . $this->workersInfo[$pid]['type'] . '***' . $pid, 'info', $this->logSaveFileWorker);
$this->logger->log($pid . ',' . $this->status . ',' . self::STATUS_RUNNING . ',' . $this->workersInfo[$pid]['type'] . ',' . self::CHILD_PROCESS_CAN_RESTART, 'info', $this->logSaveFileWorker);

//主进程状态为running并且该子进程是可以重启的
if (Process::STATUS_RUNNING == $this->status && Process::CHILD_PROCESS_CAN_RESTART == $this->workersInfo[$pid]['type']) {
if (self::STATUS_RUNNING == $this->status && self::CHILD_PROCESS_CAN_RESTART == $this->workersInfo[$pid]['type']) {
try {
//子进程重启可能失败,必须启动成功之后,再往下执行;最多尝试30次
for ($i=0; $i < 30; ++$i) {
Expand All @@ -220,7 +222,7 @@ public function registSignal()
}

$this->workers[$newPid] = $childProcess;
$this->workersInfo[$newPid]['type'] = Process::CHILD_PROCESS_CAN_RESTART;
$this->workersInfo[$newPid]['type'] = self::CHILD_PROCESS_CAN_RESTART;
$this->workersInfo[$newPid]['topic'] = $topic;
++$this->workerNum;
$this->logger->log("Worker Restart, kill_signal={$ret['signal']} PID=" . $newPid, 'info', $this->logSaveFileWorker);
Expand All @@ -231,16 +233,16 @@ public function registSignal()
}
}
//某个topic动态变化的子进程,退出之后个数减少一个
if (Process::CHILD_PROCESS_CAN_NOT_RESTART == $this->workersInfo[$pid]['type']) {
if (self::CHILD_PROCESS_CAN_NOT_RESTART == $this->workersInfo[$pid]['type']) {
--$this->dynamicWorkerNum[$topic];
}
$this->logger->log("Worker Exit, kill_signal={$ret['signal']} PID=" . $pid, 'info', $this->logSaveFileWorker);
unset($this->workers[$pid], $this->workersInfo[$pid]);
--$this->workerNum;

$this->logger->log('Worker count: ' . count($this->workers) . '==' . $this->workerNum, 'info', $this->logSaveFileWorker);
$this->logger->log('Worker count: ' . \count($this->workers) . '==' . $this->workerNum, 'info', $this->logSaveFileWorker);
//如果$this->workers为空,且主进程状态为wait,说明所有子进程安全退出,这个时候主进程退出
if (empty($this->workers) && Process::STATUS_WAIT == $this->status) {
if (empty($this->workers) && self::STATUS_WAIT == $this->status) {
$this->logger->log('主进程收到所有信号子进程的退出信号,子进程安全退出完成', 'info', $this->logSaveFileWorker);
$this->exitMaster();
}
Expand All @@ -257,7 +259,7 @@ public function registTimer()
\Swoole\Timer::tick($this->queueTickTimer, function ($timerId) {
$topics = $this->topics;
$this->status =$this->getMasterData('status');
if (empty($this->workers) && Process::STATUS_WAIT == $this->status) {
if (empty($this->workers) && self::STATUS_WAIT == $this->status) {
$this->exitMaster();
}
$this->queue = Queue::getQueue($this->config['job']['queue'], $this->logger);
Expand All @@ -268,7 +270,7 @@ public function registTimer()
}
$this->queue->setTopics($topics);

if ($topics && Process::STATUS_RUNNING == $this->status) {
if ($topics && self::STATUS_RUNNING == $this->status) {
//遍历topic任务列表
foreach ((array) $topics as $topic) {
if (empty($topic['name'])) {
Expand Down Expand Up @@ -297,7 +299,7 @@ public function registTimer()
//如果当个队列设置了queueMaxNum项,以这个值作为是否警告的阀值;
$queueMaxNum = $topic['queueMaxNum'] ?? $this->queueMaxNum;
//消息提醒:消息体收集
if ($len > $queueMaxNum && count($this->message) <= count($topics) && count($this->message) <= 5) {
if ($len > $queueMaxNum && \count($this->message) <= \count($topics) && \count($this->message) <= 5) {
$this->message[]= strtr('Hostname: {hostname} Time:{time} Pid:{pid} ProName:{pname} Topic:{topic} Message:{message}' . PHP_EOL . '--------------' . PHP_EOL, [
'{time}' => date('Y-m-d H:i:s'),
'{pid}' => $this->ppid,
Expand All @@ -310,11 +312,11 @@ public function registTimer()

//如果当个队列设置了queueMaxNumForProcess项,以这个值作为是否拉起动态子进程的阀值;
$queueMaxNumForProcess = $topic['queueMaxNumForProcess'] ?? $this->queueMaxNumForProcess;
if ($topic['workerMaxNum'] > $topic['workerMinNum'] && Process::STATUS_RUNNING == $this->status && $len > $queueMaxNumForProcess && $this->dynamicWorkerNum[$topic['name']] < $topic['workerMaxNum']) {
if ($topic['workerMaxNum'] > $topic['workerMinNum'] && self::STATUS_RUNNING == $this->status && $len > $queueMaxNumForProcess && $this->dynamicWorkerNum[$topic['name']] < $topic['workerMaxNum']) {
$max=$topic['workerMaxNum'] - $this->dynamicWorkerNum[$topic['name']];
for ($i=0; $i < $max; ++$i) {
//队列堆积达到一定数据,拉起一次性子进程,这类进程不会自动重启[没必要]
$this->reserveQueue($this->dynamicWorkerNum[$topic['name']], $topic['name'], Process::CHILD_PROCESS_CAN_NOT_RESTART);
$this->reserveQueue($this->dynamicWorkerNum[$topic['name']], $topic['name'], self::CHILD_PROCESS_CAN_NOT_RESTART);
++$this->dynamicWorkerNum[$topic['name']];
$this->logger->log('topic: ' . $topic['name'] . ' ' . $this->status . ' len: ' . $len . ' for: ' . $i . ' ' . $max, 'info', $this->logSaveFileWorker);
}
Expand Down Expand Up @@ -358,7 +360,7 @@ private function killWorkersAndExitMaster()
\Swoole\Process::kill($pid);
unset($this->workers[$pid]);
$this->logger->log('主进程收到退出信号,[' . $pid . ']子进程跟着退出', 'info', $this->logSaveFileWorker);
$this->logger->log('Worker count: ' . count($this->workers), 'info', $this->logSaveFileWorker);
$this->logger->log('Worker count: ' . \count($this->workers), 'info', $this->logSaveFileWorker);
}
}
$this->exitMaster();
Expand Down Expand Up @@ -392,7 +394,7 @@ private function checkMpid(&$worker)
private function setProcessName($name)
{
//mac os不支持进程重命名
if (function_exists('swoole_set_process_name') && PHP_OS != 'Darwin') {
if (\function_exists('swoole_set_process_name') && PHP_OS != 'Darwin') {
swoole_set_process_name($name);
}
}
Expand Down Expand Up @@ -424,7 +426,7 @@ private function showStatus()
$statusStr .= 'Now: ' . date('Y-m-d H:i:s') . ' PHP version:' . PHP_VERSION . ' Swoole-jobs version: ' . $this->version . PHP_EOL;
$statusStr .= 'start time : ' . date('Y-m-d H:i:s', $this->beginTime) . ' run ' . floor((time() - $this->beginTime) / (24 * 60 * 60)) . ' days ' . floor(((time() - $this->beginTime) % (24 * 60 * 60)) / (60 * 60)) . ' hours ' . PHP_EOL;
$statusStr .= Utils::getSysLoadAvg() . ' memory use:' . Utils::getServerMemoryUsage() . PHP_EOL;
$statusStr .= '|-- Master pid ' . $this->ppid . ' status: ' . $this->status . ' Worker num: ' . count($this->workers) . PHP_EOL;
$statusStr .= '|-- Master pid ' . $this->ppid . ' status: ' . $this->status . ' Worker num: ' . \count($this->workers) . PHP_EOL;
if ($this->workers) {
foreach ($this->workers as $pid => $value) {
$type =$this->workersInfo[$pid]['type'];
Expand Down
10 changes: 5 additions & 5 deletions src/Queue/RabbitmqTopicQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public function push($topic, JobObject $job, $delayStrategy=1, $serializeFunc='p
}

$queue = $this->createQueue($topic);
if (!is_object($queue)) {
if (!\is_object($queue)) {
//对象有误 则直接返回空
return '';
}
Expand Down Expand Up @@ -138,23 +138,23 @@ public function pop($topic, $unSerializeFunc='php')
}
//reset consumer and message properties
$this->consumer=null;
$this->message=null;
$this->message =null;

$queue = $this->createQueue($topic);
$consumer = $this->context->createConsumer($queue);

if ($m = $consumer->receive(1)) {
$result =$m->getBody();
$this->consumer =$consumer;
$this->message =$m;
$this->message =$m;
//判断字符串是否是php序列化的字符串,目前只允许serialzie和json两种
$unSerializeFunc=Serialize::isSerial($result) ? 'php' : 'json';

return !empty($result) ? Serialize::unserialize($result, $unSerializeFunc) : null;
}
}

public function ack(): boolean
public function ack(): bool
{
if ($this->consumer && $this->message) {
$this->consumer->acknowledge($this->message);
Expand All @@ -174,7 +174,7 @@ public function len($topic): int
}

$queue = $this->createQueue($topic);
if (!is_object($queue)) {
if (!\is_object($queue)) {
//对象有误 则直接返回空
return -1;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Queue/RedisTopicQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public function pop($topic, $unSerializeFunc='php')
}

//redis不支持ack功能,搞个假的,没办法
public function ack(): boolean
public function ack(): bool
{
return true;
}
Expand Down
8 changes: 4 additions & 4 deletions src/Queue/TopicQueueInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,10 @@ public function push($topic, JobObject $job): string;
*/
public function pop($topic);

/**
* ack确认消息
*/
public function ack(): boolean;
/**
* ack确认消息.
*/
public function ack(): bool;

/**
* @param $topic
Expand Down
17 changes: 14 additions & 3 deletions src/Utils.php
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php

/*
* This file is part of PHP CS Fixer.
* This file is part of Swoole-jobs
* (c) kcloze <[email protected]>
* This source file is subject to the MIT license that is bundled
* with this source code in the file LICENSE.
Expand All @@ -27,7 +27,7 @@ public static function mkdir($path, $mode=0777, $recursive=true)

public static function catchError(Logs $logger, $exception)
{
$error = '错误类型:' . get_class($exception) . PHP_EOL;
$error = '错误类型:' . \get_class($exception) . PHP_EOL;
$error .= '错误代码:' . $exception->getCode() . PHP_EOL;
$error .= '错误信息:' . $exception->getMessage() . PHP_EOL;
$error .= '错误堆栈:' . $exception->getTraceAsString() . PHP_EOL;
Expand Down Expand Up @@ -57,8 +57,19 @@ public static function getServerMemoryUsage()
*/
public static function getSysLoadAvg()
{
$loadavg = function_exists('sys_getloadavg') ? array_map('round', sys_getloadavg(), [2]) : ['-', '-', '-'];
$loadavg = \function_exists('sys_getloadavg') ? array_map('round', sys_getloadavg(), [2]) : ['-', '-', '-'];

return 'load average: ' . implode(', ', $loadavg);
}

//获取机器名称
public static function getHostName()
{
$hostname=gethostname();
if (empty($hostname)) {
$hostname='system';
}

return strtolower(trim($hostname));
}
}