博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
PHP 高级编程之多线程
阅读量:4157 次
发布时间:2019-05-26

本文共 10148 字,大约阅读时间需要 33 分钟。

1. 多线程环境安装

1.1. PHP 5.5.9

安装PHP 5.5.9

https://github.com/oscm/shell/blob/master/php/5.5.9.sh

./configure --prefix=/srv/php-5.5.9 \--with-config-file-path=/srv/php-5.5.9/etc \--with-config-file-scan-dir=/srv/php-5.5.9/etc/conf.d \--enable-fpm \--with-fpm-user=www \--with-fpm-group=www \--with-pear \--with-curl \--with-gd \--with-jpeg-dir \--with-png-dir \--with-freetype-dir \--with-zlib-dir \--with-iconv \--with-mcrypt \--with-mhash \--with-pdo-mysql \--with-mysql-sock=/var/lib/mysql/mysql.sock \--with-openssl \--with-xsl \--with-recode \--enable-sockets \--enable-soap \--enable-mbstring \--enable-gd-native-ttf \--enable-zip \--enable-xml \--enable-bcmath \--enable-calendar \--enable-shmop \--enable-dba \--enable-wddx \--enable-sysvsem \--enable-sysvshm \--enable-sysvmsg \--enable-opcache \--enable-pcntl \--enable-maintainer-zts \--disable-debug

编译必须启用zts支持否则无法安装 pthreads(--enable-maintainer-zts)

1.2. 安装 pthreads 扩展

安装https://github.com/oscm/shell/blob/master/php/pecl/pthreads.sh

# curl -s https://raw.github.com/oscm/shell/master/php/pecl/pthreads.sh | bash

查看pthreads是否已经安装

# php -m | grep pthreads

2. Thread

world = $world; } public function run() { print_r(sprintf("Hello %s\n", $this->world)); }}$thread = new HelloWorld("World");if ($thread->start()) { printf("Thread #%lu says: %s\n", $thread->getThreadId(), $thread->join());}?>

3. Worker 与 Stackable

sql = $sql; } public function run() { $dbh = $this->worker->getConnection(); $row = $dbh->query($this->sql); while($member = $row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } }}class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } /* * The run method should just prepare the environment for the work that is coming ... */ public function run(){ self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456'); } public function getConnection(){ return self::$dbh; }}$worker = new ExampleWorker("My Worker Thread");$work=new SQLQuery('select * from members order by id desc limit 5');$worker->stack($work);$table1 = new SQLQuery('select * from demousers limit 2');$worker->stack($table1);$worker->start();$worker->shutdown();?>

4. 互斥锁

什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。

下面我们举一个例子,一个简单的计数器程序,说明有无互斥锁情况下的不同。

mutex = $mutex; $this->handle = fopen("/tmp/counter.txt", "w+"); } public function __destruct(){ fclose($this->handle); } public function run() { if($this->mutex) $locked=Mutex::lock($this->mutex); $counter = intval(fgets($this->handle)); $counter++; rewind($this->handle); fputs($this->handle, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); if($this->mutex) Mutex::unlock($this->mutex); }}//没有互斥锁for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread(); $threads[$i]->start();}//加入互斥锁$mutex = Mutex::create(true);for ($i=0;$i<50;$i++){ $threads[$i] = new CounterThread($mutex); $threads[$i]->start();}Mutex::unlock($mutex);for ($i=0;$i<50;$i++){ $threads[$i]->join();}Mutex::destroy($mutex);?>

我们使用文件/tmp/counter.txt保存计数器值,每次打开该文件将数值加一,然后写回文件。当多个线程同时操作一个文件的时候,就会线程运行先后取到的数值不同,写回的数值也不同,最终计数器的数值会混乱。

没有加入锁的结果是计数始终被覆盖,最终结果是2

而加入互斥锁后,只有其中的一个进程完成加一工作并释放锁,其他线程才能得到解锁信号,最终顺利完成计数器累加操作

上面例子也可以通过对文件加锁实现,这里主要讲的是多线程锁,后面会涉及文件锁。

4.1. 多线程与共享内存

在共享内存的例子中,没有使用任何锁,仍然可能正常工作,可能工作内存操作本身具备锁的功能。

shmid = $shmid; } public function run() { $counter = shm_get_var( $this->shmid, 1 ); $counter++; shm_put_var( $this->shmid, 1, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); }}for ($i=0;$i<100;$i++){ $threads[] = new CounterThread($shmid);}for ($i=0;$i<100;$i++){ $threads[$i]->start();}for ($i=0;$i<100;$i++){ $threads[$i]->join();}shm_remove( $shmid );shm_detach( $shmid );?>

5. 线程同步

有些场景我们不希望 thread->start() 就开始运行程序,而是希望线程等待我们的命令。

$thread->wait();测作用是 thread->start()后线程并不会立即运行,只有收到 $thread->notify(); 发出的信号后才运行

shmid = $shmid; } public function run() { $this->synchronized(function($thread){ $thread->wait(); }, $this); $counter = shm_get_var( $this->shmid, 1 ); $counter++; shm_put_var( $this->shmid, 1, $counter ); printf("Thread #%lu says: %s\n", $this->getThreadId(),$counter); }}for ($i=0;$i<100;$i++){ $threads[] = new CounterThread($shmid);}for ($i=0;$i<100;$i++){ $threads[$i]->start();}for ($i=0;$i<100;$i++){ $threads[$i]->synchronized(function($thread){ $thread->notify(); }, $threads[$i]);}for ($i=0;$i<100;$i++){ $threads[$i]->join();}shm_remove( $shmid );shm_detach( $shmid );?>

6. 线程池

6.1. 线程池

自行实现一个Pool类

row = $row; $this->sql = null; } public function run() { if(strlen($this->row['bankno']) > 100 ){ $bankno = safenet_decrypt($this->row['bankno']); }else{ $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']); file_put_contents("bankno_error.log", $error, FILE_APPEND); } if( strlen($bankno) > 7 ){ $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql; } printf("%s\n",$this->sql); }}class Pool { public $pool = array(); public function __construct($count) { $this->count = $count; } public function push($row){ if(count($this->pool) < $this->count){ $this->pool[] = new Update($row); return true; }else{ return false; } } public function start(){ foreach ( $this->pool as $id => $worker){ $this->pool[$id]->start(); } } public function join(){ foreach ( $this->pool as $id => $worker){ $this->pool[$id]->join(); } } public function clean(){ foreach ( $this->pool as $id => $worker){ if(! $worker->isRunning()){ unset($this->pool[$id]); } } }}try { $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select id,bankno from members order by id desc"; $row = $dbh->query($sql); $pool = new Pool(5); while($member = $row->fetch(PDO::FETCH_ASSOC)) { while(true){ if($pool->push($member)){ //压入任务到池中 break; }else{ //如果池已经满,就开始启动线程 $pool->start(); $pool->join(); $pool->clean(); } } } $pool->start(); $pool->join(); $dbh = null;} catch (Exception $e) { echo '[' , date('H:i:s') , ']', '系统错误', $e->getMessage(), "\n";}?>

6.2. 动态队列线程池

上面的例子是当线程池满后执行start统一启动,下面的例子是只要线程池中有空闲便立即创建新线程。

row = $row; $this->sql = null; //print_r($this->row); } public function run() { if(strlen($this->row['bankno']) > 100 ){ $bankno = safenet_decrypt($this->row['bankno']); }else{ $error = sprintf("%s, %s\r\n",$this->row['id'], $this->row['bankno']); file_put_contents("bankno_error.log", $error, FILE_APPEND); } if( strlen($bankno) > 7 ){ $sql = sprintf("update members set bankno = '%s' where id = '%s';", $bankno, $this->row['id']); $this->sql = $sql; } printf("%s\n",$this->sql); }}try { $dbh = new PDO("mysql:host=" . str_replace(':', ';port=', $dbhost) . ";dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF8\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select id,bankno from members order by id desc limit 50"; $row = $dbh->query($sql); $pool = array(); while($member = $row->fetch(PDO::FETCH_ASSOC)) { $id = $member['id']; while (true){ if(count($pool) < 5){ $pool[$id] = new Update($member); $pool[$id]->start(); break; }else{ foreach ( $pool as $name => $worker){ if(! $worker->isRunning()){ unset($pool[$name]); } } } } } $dbh = null;} catch (Exception $e) { echo '【' , date('H:i:s') , '】', '【系统错误】', $e->getMessage(), "\n";}?>

6.3. pthreads Pool类

pthreads 提供的 Pool class 例子

logger = $logger; } protected $loger; }class WebWork extends Stackable { public function isComplete() { return $this->complete; } public function run() { $this->worker ->logger ->log("%s executing in Thread #%lu", __CLASS__, $this->worker->getThreadId()); $this->complete = true; } protected $complete;}class SafeLog extends Stackable { protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf( "{$message}\n", $args); } }}$pool = new Pool(8, \WebWorker::class, [new SafeLog()]);$pool->submit($w=new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->submit(new WebWork());$pool->shutdown();$pool->collect(function($work){ return $work->isComplete();});var_dump($pool);

7. 多线程文件安全读写(文件锁)

文件所种类。

LOCK_SH 取得共享锁定(读取的程序)。LOCK_EX 取得独占锁定(写入的程序。LOCK_UN 释放锁定(无论共享或独占)。LOCK_NB 如果不希望 flock() 在锁定时堵塞

共享锁例子

共享锁例子2

8. 多线程与数据连接

pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。

worker->getConnection(); $sql = "select id,name from members order by id desc limit 50"; $row = $dbh->query($sql); while($member = $row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } }}class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } /* * The run method should just prepare the environment for the work that is coming ... */ public function run(){ self::$dbh = new PDO('mysql:host=192.168.2.1;dbname=example','www','123456'); } public function getConnection(){ return self::$dbh; }}$worker = new ExampleWorker("My Worker Thread");$work=new Work();$worker->stack($work);$worker->start();$worker->shutdown();?>
来源:http://netkiller.github.io/journal/thread.php.html    
 

转载地址:http://uwkxi.baihongyu.com/

你可能感兴趣的文章
史上最详细MFC调用mapX5.02.26步骤(附地图测试GST文件)
查看>>
CMFCShellListCtrl使用方法
查看>>
mapnik的demo运行
查看>>
python支持下的mapnik安装
查看>>
milvus手册
查看>>
查看pytorch基于cuda 的哪个版本
查看>>
多目标跟踪的简单理解
查看>>
Near-Online Multi-target Tracking with Aggregated Local Flow Descriptor
查看>>
Joint Tracking and Segmentation of Multiple Targets
查看>>
Subgraph Decomposition for Multi-Target Tracking
查看>>
JOTS: Joint Online Tracking and Segmentation
查看>>
CDT: Cooperative Detection and Tracking for Tracing Multiple Objects in Video Sequences
查看>>
Improving Multi-frame Data Association with Sparse Representations for Robust Near-online Multi-ob
查看>>
Virtual Worlds as Proxy for Multi-Object Tracking Analysis
查看>>
Multi-view People Tracking via Hierarchical Trajectory Composition
查看>>
Online Multi-Object Tracking via Structural Constraint Event Aggregation
查看>>
The Solution Path Algotithm for Identity-Aware Multi-Object Tracking
查看>>
Groupwise Tracking of Crowded Similar-Appearance Targets from Low-Continuity Image Sequences
查看>>
CDTS: Collaborative Detection, Tracking, and Segmentation for Online Multiple Object Segmentation
查看>>
Deep Network Flow for Multi-Object Tracking
查看>>