threads: Threads are separate execution contexts that run in the same memory space, able to freely read and write a shared heap. They are part of the same process, and will usually share the same memory & file resources

CLI: pthreads threading

multithreading:

It's not a part of PHP core, so you'll have to install it:

pecl install pthreads

With pthreads v3 pthreads can only be loaded when using the cli SAPI, thus it is a good practice to keep the extension=pthreads.so directive in php-cli.ini ONLY, if you are using PHP7 and Pthreads v3.

echo "extension=pthreads.so" >> /etc/php.ini
$thread = new class extends Thread {
    public function 
run() {
        
$this->data =  "Hello World\n";
    }
};
$thread->start() && $thread->join();
// we can now access $thread->data
class MyThread extends Thread {
    
/**
    * @var string   contain the message to be displayed
    */
    private $message;
    public function 
__construct(string $message) {
        
// Set the message value for this particular instance.
        $this->message $message;
    }
    
// The operations performed in this function is executed in the other thread.
    public function run() {
        echo 
$this->message;
    }
}
// Instantiate MyThread
$myThread = new MyThread("Hello from an another thread!");

// initiate the thread,
$myThread->start();
//   Thread::join() causes the interpreter to wait for the thread to finish
$myThread->join();

Pools

// This is the *Work* which would be ran by the worker.
// The work which you'd want to do in your worker.
// This class needs to extend the \Threaded or \Collectable or \Thread class.
class AwesomeWork extends Thread {
private 
$workName;
/**
* @param string $workName
* The work name wich would be given to every work.
*/
public function __construct(string $workName) {
// The block of code in the constructor of your work,
// would be executed when a work is submitted to your pool.
$this->workName $workName;
printf("A new work was submitted with the name: %s\n"$workName);
}
public function 
run() {
// This block of code in, the method, run
// would be called by your worker.
// All the code in this method will be executed in another thread.
$workName $this->workName;
printf("Work named %s starting...\n"$workName);
printf("New random number: %d\n"mt_rand());
}
}
// Create an empty worker for the sake of simplicity.
class AwesomeWorker extends Worker {
public function 
run() {
// You can put some code in here, which would be executed
// before the Work's are started (the block of code in the `run` method of your Work)
// by the Worker.
/* ... */
}
}
// Create a new Pool Instance.
// The ctor of \Pool accepts two parameters.
// First: The maximum number of workers your pool can create.
// Second: The name of worker class.
$pool = new \Pool(1, \AwesomeWorker::class);
// You need to submit your jobs, rather the instance of
// the objects (works) which extends the \Threaded class.
$pool->submit(new \AwesomeWork("DeadlyWork"));
$pool->submit(new \AwesomeWork("FatalWork"));
// We need to explicitly shutdown the pool, otherwise,
// unexpected things may happen.
// See: http://stackoverflow.com/a/23600861/23602185
$pool->shutdown();

Amp Promises

composer require amphp/amp:^1

the basic approach using it: Generators and Coroutines (since PHP 5.5, better syntax in PHP 7)

  • based on pthreads
  • promises-based interface
  • designed specifically for cli application
function worker() {
    
// Do some work
    return 'result';
}
$dispatcher = new Amp\Thread\Dispatcher;
// call 2 functions to be executed asynchronously
$promise1 $dispatcher->call('worker');
$promise2 $dispatcher->call('worker');
$comboPromise Amp\all([$promise1$promise2]);
list(
$result1$result2) = $comboPromise->wait();
// $result1 & $result2 now contain the results of both threads

process spawn

eseguire comandi in parallelo (spawn) won't share any memory

// nohup: Run COMMAND, ignoring hangup signals.
exec('nohup /usr/bin/my-command $arg1 > /dev/null 2>&1 &');

pcntl forking multiprocessing

Forking a process will result in the request being cloned into an exact replica, though with it's own address space.

After forking, changing a variable's value in one process doesn't affect the other process though.

basic:

if ($pid === -1) {
    die(
'failed to fork');
} elseif (
$pid === 0) {//   the child thread
    // Existing variables will live in both processes,
    // but changes will not affect other process.
    echo $var// will output 'one'
    $var 'two'// will not affect parent process
    // Do some work
} else {
    
// $pid != 0, this is the parent thread
    // Do some work, while already doing other work in the child process.
    echo $var// will output 'one'
    $var 'three'// will not affect child process
    // make sure the parent outlives the child process
    pcntl_wait($status);
}

multiple forks:

if (! function_exists('pcntl_fork')){
  die(
'PCNTL functions not available on this PHP installation');
}
$maxt=5;// thread nun
for ($x 1$x $maxt$x++) {
   switch (
$pid pcntl_fork()) {
      case -
1:
         die(
'could not fork');
         break;

      case 
0:
        
// run child
        $local_pid posix_getpid();
        
slog($local_pid);
        
io_detach();

        
register_shutdown_function(function () {
            
posix_kill(posix_getpid(), SIGHUP);
        } );
        break;

      default:
        
// parent
        echo "parent: \n"
        
pcntl_waitpid($pid$status);

        
//pcntl_wait($status);
        $local_pid posix_getpid();
        
slog($local_pid);
        break;
   }
}
// control all child processes gets completed
while (pcntl_waitpid(0$status) != -1) {
    
$status pcntl_wexitstatus($status);
    echo 
"Child $status completed\n";
}
function 
io_detach(){
    
ob_end_clean(); // Discard the output buffer and close
    fclose(STDIN);  // Close all of the standard
    fclose(STDOUT); // file descriptors as we
    fclose(STDERR); // are running as a daemon.
}
foreach ($tasks as $task)  {
    if ( (
$pid=pcntl_fork()) === -)  {
         die(
'could not fork');
         
// continue;
    } elseif ($pid)  {
          
//protect against zombie children, one wait vs one child
          pcntl_wait($statusWNOHANG);
    }  elseif (
$pid===0)  { // child
           ob_start();//prevent output to main process
           // to kill self before exit();, or else the resource shared with parent will be closed
           register_shutdown_function(function() {
             
ob_end_clean();
             
posix_kill(getmypid(), SIGKILL );
           });
           exit(
0);// avoid foreach loop in child process
    }
}

IPC

types available in PHP:

- Signals (posix_kill to send a signal, pcntl_signal to set up a signal handler), a limited type of message passing. Signals aren't particularly useful in scripted pages, as each script should run for a very short time.

- Sockets for data. Sockets can use a network, or can be local. - Pipes for data. posix_mkfifo is used to create named pipes (aka FIFOs), and the standard file functions are used to read and write data. Unnamed (aka anonymous) pipes can be created between parent and child processes using popen or proc_open. Note unnamed pipes cannot be created between arbitrary processes. Note that pipes on some systems are unidirectional: a pipe handle can be used either to read or write, but not both. - Semaphores for synchronization. - Message queues for messaging. In PHP, the Semaphore extension offers both message queues and another set of shared memory functions (e.g. shm_attach). Many other extensions for various messaging protocols are also available, including SAM, STOMP and AMQP. See "Other Services" in the PHP manual for, well, others. - Network stream wrappers for data. At a lower level, these are just sockets, though they provide a different interface. They are also for specific application level protocols, whereas sockets are more general. Network protocol extensions, such as cURL, for messaging & data. Like stream wrappers, these are (limited) sockets in disguise. - Web service extensions, such as SOAP and XML-RPC, for remote procedure calls (RPC). Note that while these are socket based, they're for a different type of IPC (RPC rather than data). - While sockets (and anything based on them, such as stream wrappers) and pipes can be used to pass data between processes, their abilities with more than two processes are limited. Sockets can only connect two processes; to handle more than two, multiple sockets need to be opened (which is where a client-server architecture usually comes into it). With pipes, only one process can read given data; once it has, that data won't be available to other readers, though they can read other data (which will then become unavailable to all but the reader). An arbitrary number of processes can open the same shared memory region.

<?php
$shm_key 
ftok(__FILE__'t');// genera una key IPC per il processo
$shm_id shmop_open($shm_key"c"0644100);
$data 'test';
$shm_bytes_written shmop_write($shm_idserialize($data), 0);
$shm_data unserialize(shmop_read($shm_id0$shm_bytes_written));

shared mem con semafori( flag sysvipc ):

$shm_key ftok(__FILE__'R');
$shmid sem_get($shm_key10240644 IPC_CREAT);
// la var $data diventa un alias della memoria condivisa
$data shm_attach($shmid1024);

// write to sm
shm_put_var($data$inmem"test");

// read sm
printf("shared contents: %s\n"shm_get_var($data$inmem));

shm_detach($data);

per comunicare tra programmi in linguaggi differenti, possibili alternative:

semafori

$shmop shmop_open(ftok(__FILE__chr(time() & 255)), 'c'0644$shsize);
$written_size shmop_write($shmoppack("ii"$maxflips$chksum), $proc 16);
shmop_read($shmop$offset$written_size)

coda dei messaggi System V

$shm_keymsg_get_queue(ftok(__FILE__'R'), 0666 IPC_CREAT);

if (!
msg_send ($shm_key1'This is message #1'truetrue$msg_err))
  echo 
"error $msg_err\n";
if (
msg_receive ($shm_key1$msg_type16384$msgtrue0$msg_error)) {
  if (
$msg == 'Quit');
    echo 
"$msg\n";
} else {
  echo 
"error: $msg_error \n";
}

print_r(msg_stat_queue($shm_key));

Popen

// launch child process child.php
// open child process
$child popen('php child.php''r');
// Do some work, while already doing other
// work in the child process.

// now get response from child and wait it's complete:
$response stream_get_contents($child);

// use to block the parent's execution and ensure child has finished, if not finished you get partial output
stream_set_blocking($child0)
// using proc_open, will execute the command and return a resource that we can communicate with.
$descriptor = [
  
=> array('pipe''r'), // pipe for stdin of child
  => array('pipe''w'), // pipe for stdout of child
];
$process proc_open('bash'$descriptor$pipes);
if (
is_resource($process)) {
  
$cmd 'pwd';
  
fwrite($pipes[0], $cmd "\n");//write a cmd in stdin
  fclose($pipes[0]);
  echo 
$stdout=stream_get_contents($pipes[1]);// get stdout of executed cmd
  fclose($pipes[1]);
  
$exit_status_code proc_close($process);//get exit code
}
/*
runs bash command with $descriptor as descriptor specifications. After that we use
is_resource to validate the process. Once done we can start interacting with the child process
using $pipes which is generated according to descriptor specifications.
proc_open
After that we can simply use fwrite to write to stdin of child process. In this case pwd followed by
carriage return. Finally stream_get_contents is used to read stdout of child process.
Always remember to close the child process by using proc_close() which will terminate
the child and return the exit status code.
*/

Concurrent Curl

per eseguire operazioni di rete concorrenti conviee usare la libreria Curl anziché usare i thread abasso livello

// concurrent request processing using cURL curl_multi* functions.
// $opts   general options for all requests
// @return null if response empty or error, error string if there was an error
function curl_multi_request(array $requests, array $opts = []) {
    
//  curl handles
    $chs = [];
    
// merge general curl options args with defaults
    $opts += [CURLOPT_CONNECTTIMEOUT => 3CURLOPT_TIMEOUT => 3CURLOPT_RETURNTRANSFER => 1];

    
$responses = [];
    
// init curl multi handle
    $mh curl_multi_init();
    
// running flag
    $running null;

    foreach (
$requests as $key => $request) {
        
// init individual curl handle
        $chs[$key] = curl_init();

        
curl_setopt($chs[$key], CURLOPT_URL$request['url']);

        if (
$request['post_data']) {
            
curl_setopt($chs[$key], CURLOPT_POST1);
            
curl_setopt($chs[$key], CURLOPT_POSTFIELDS$request['post_array']);
        }
        
// set opts
        curl_setopt_array($chs[$key], (isset($request['opts']) ? $request['opts'] + $opts $opts));
        
curl_multi_add_handle($mh$chs[$key]);
    }
    do {
        
curl_multi_exec($mh$running);
        
// block to avoid needless cycling until change in status
        curl_multi_select($mh);
        
// check flag to see if we're done
    } while($running 0);

    
// cycle through requests
    foreach ($chs as $key => $ch) {
        
// handle error
        if (curl_errno($ch)) {
            
$responses[$key] = ['data' => null'info' => null'error' => curl_error($ch)];
        } else {
            
// save successful response
            $responses[$key] = ['data' => curl_multi_getcontent($ch), 'info' => curl_getinfo($ch), 'error' => null];
        }
        
// close individual handle
        curl_multi_remove_handle($mh$ch);
    }

    
curl_multi_close($mh);

    return 
$responses;
}
// use:
$responses curl_multi_request([
    
'google' => ['url' => 'http://google.com''opts' => [CURLOPT_TIMEOUT => 2]],
    
'msu' => ['url'=> 'http://test.com']
]));

alternatives could be fopen/fsockopen

// open child process
$child = fopen('http://'.$_SERVER['HTTP_HOST'].'/child.php', 'r');
// Do some work, while already doing other
// work in the child process.
// get response from child (if any) as soon at it's ready:
$response = stream_get_contents($child);