parallel\run

(1.0.0)

parallel\runВыполнение

Описание

parallel\run(Closure $task): ?Future

Планирует task для выполнения в parallel.

parallel\run(Closure $task, array $argv): ?Future

Планирует task для выполнения в parallel, передавая argv во время выполнения.

Автоматическое планирование

Если \parallel\Runtime внутренне созданный и кешированный предыдущим вызовом parallel\run() бездействует, он будет использоваться для выполнения задачи. Если ни один \parallel\Runtime не бездействует, parallel создаст и сохранит в кеш \parallel\Runtime.

Замечание:

Объекты \parallel\Runtime, созданные программистом, не используются для автоматического планирования.

Список параметров

task

Замыкание (Closure) со специфическими характеристиками.

argv

Массив (array) аргументов с конкретными характеристиками, которые будут переданы task во время выполнения.

Характеристики задачи

Замыкания, запланированные для параллельного выполнения, не должны:

  • принимать или возвращать значения по ссылке
  • принимать или возвращать внутренние объекты (смотрите примечания)
  • выполнять ограниченный набор инструкций

В замыканиях, предназначенных для параллельного выполнения, запрещены следующие инструкции:

  • yield
  • use by-reference
  • declare class
  • declare named function

Замечание:

Вложенные замыкания могут использовать yield или передачу значения по ссылке, но не должны содержать объявления классов или именованных функций.

Замечание:

Никакие инструкции не запрещены в файлах, которые может включать задача.

Аргументы характеристик

Аргументы не должны:

  • содержать ссылки
  • содержать ресурсы
  • содержать внутренние объекты (смотрите примечания)

Замечание:

В случае ресурсов файлового потока ресурс будет преобразован в файловый дескриптор и передан как целое число (int), где это возможно, не поддерживается в Windows.

Примечания к внутренним объектам

Внутренние объекты обычно используют настраиваемую структуру, которую нельзя безопасно скопировать по значению, в PHP в настоящее время отсутствует механизм для этого (без сериализации), и поэтому могут использоваться только объекты, которые не используют настраиваемую структуру.

Некоторые внутренние объекты не используют настраиваемую структуру, например, parallel\Events\Event и поэтому могут использоваться совместно.

Замыкания - это особый вид внутреннего объекта, который поддерживает копирование по значению, поэтому может использоваться совместно.

Каналы играют ключевую роль в написании параллельного кода и при необходимости поддерживают одновременный доступ и выполнение, поэтому могут использоваться совместно.

Внимание

Пользовательский класс, расширяющий внутренний класс, может использовать настраиваемую структуру, определённую внутренним классом, и в этом случае они не могут быть безопасно скопированы по значению и поэтому не могут использоваться совместно.

Возвращаемые значения

Внимание

Нельзя игнорировать возвращаемый parallel\Future, если задача содержит оператор return или throw.

Исключения

Внимание

Выбрасывает parallel\Runtime\Error\Closed, если parallel\Runtime был закрыт.

Внимание

Выбрасывает parallel\Runtime\Error\IllegalFunction, если task является замыканием, созданным из внутренней функции.

Внимание

Выбрасывает parallel\Runtime\Error\IllegalInstruction, если task содержит недопустимые инструкции.

Внимание

Выбрасывает parallel\Runtime\Error\IllegalParameter, если task принимает или argv содержит недопустимые переменные.

Внимание

Выбрасывает parallel\Runtime\Error\IllegalReturn, если task возвращается некорректно.

Смотрите также

add a note add a note

User Contributed Notes 3 notes

up
21
john_2885 at yahoo dot com
4 years ago
Here's a more substantial example of how to use the run functional API.

<?php
/*********************************************
* Sample parallel functional API
*
* Scenario
* -------------------------------------------
* Given a large number of rows of
* data to process, divide the work amongst
* a set of workers.  Each worker is responsible
* for finishing their assigned task.
*
* In the code below, assume we have arbitrary
* start and end IDs (rows) - we will try to
* divide the number of IDs (rows) evenly
* across 8 workers.  The workers will get the
* following batches to process to completion:
*
* Total number of IDs (rows): 1371129
* Each worker will get 171392 IDs to process
*
* Worker 1: IDs from 11001 to 182393
* Worker 2: IDs from 182393 to 353785
* Worker 3: IDs from 353785 to 525177
* Worker 4: IDs from 525177 to 696569
* Worker 5: IDs from 696569 to 867961
* Worker 6: IDs from 867961 to 1039353
* Worker 7: IDs from 1039353 to 1210745
* Worker 8: IDs from 1210745 to 1382130
*
* Each worker then processes 5000 rows at a time
* until they are done with their assigned work
*
*********************************************/

use \parallel\{Runtime, Future, Channel, Events};

$minId = 11001;
$maxId = 1382130;
$workers = 8;
$totalIds = $maxId - $minId;
// Try to divide IDs evenly across the number of workers
$batchSize = ceil($totalIds / $workers);
// The last batch gets whatever is left over
$lastBatch = $totalIds % $batchSize;
// The number of IDs (rows) to divide the overall
// task into sub-batches
$rowsToFetch = 5000;

print
"Total IDs: " . $totalIds . "\n";
print
"Batch Size: " . $batchSize . "\n";
print
"Last Batch: " . $lastBatch . "\n";

$producer = function(int $worker, int $startId, int $endId, int $fetchSize) {
   
$tempMinId = $startId;
   
$tempMaxId = $tempMinId + $fetchSize;
   
$fetchCount = 1;
   
    print
"Worker " . $worker . " working on IDs from " . $startId . " to " . $endId . "\n";
   
    while(
$tempMinId < $endId) {
        for(
$i = $tempMinId; $i < $tempMaxId; $i++) {
           
$usleep = rand(500000, 1000000);
           
usleep($usleep);
            print
"Worker " . $worker " finished batch " . $fetchCount . " from ID " . $tempMinId . " to " . $tempMaxId . "\n";
           
// Need to explicitly break out of the for loop once complete or else it will forever process only the first sub-batch
           
break;
        }
       
       
// Now we move on to the next sub-batch for this worker
       
$tempMinId = $tempMaxId;
       
$tempMaxId = $tempMinId + $fetchSize;
        if(
$tempMaxId > $endId) {
           
$tempMaxId = $endId;
        }
       
// Introduce some timing randomness
       
$sleep = rand(1,5);
       
sleep($sleep);
       
$fetchCount++;
    }
   
   
// This worker has completed their entire batch
   
print "Worker " . $worker " finished\n";
   
};

// Create our workers and have them start working on their task
// In this case, it's a set of 171392 IDs to process
for($i = 0; $i < $workers; $i++) {
   
$startId = $minId + ($i * $batchSize);
   
$endId = $startId + $batchSize;
    if(
$i == ($workers - 1)) {
       
$endId = $maxId;
    }
    \
parallel\run($producer, array(($i+1), $startId, $endId, $rowsToFetch));
}

?>
up
9
anonymous user
3 years ago
Although function declaration is not allowed inside thread exec code, include is allowed. So if we want to declare a function, we could write another file that contain the function and include it.
# main.php
<?php
$runtime
= new parallel\Runtime ();
$future = $runtime->run ( function () {
   
$future = $runtime->run ( function () {
        include
"included.php";
        return
add (1, 3);
    }, [ ] );
echo
$future->value ();
# output: 4
# included.php
<?php
function add($a, $b){
    return
$a + $b;
}
up
2
Thierry Kauffmann
3 years ago
<?php

/**
* Sample parralel functional API
* using a generator instead of a static list of items to process
*
* Items to process in parallel come from a generator
* It could be anything : e.g fetch a mysql array, a DirectoryIterator...
* Thus the number of items to process in parallel is NOT known in advance
*
* This algorithm attributes items to each parallel thread dynamically
* As soon as a thread has finished working
* It is assigned a new item to process
* until all items are processed (generator closes)
*
* In this example we process 50 items in 5 parallel threads
* It produces output in this form (output changes at each run) :
*
* ThreadId: 1 => Item: 1 (Start)
* ThreadId: 2 => Item: 2 (Start)
* ThreadId: 3 => Item: 3 (Start)
* ThreadId: 4 => Item: 4 (Start)
* ThreadId: 5 => Item: 5 (Start)
* ThreadId: 5 => Item: 5 Sleep: 3s (End)
* ThreadId: 5 => Item: 6 (Start)
* ThreadId: 3 => Item: 3 Sleep: 4s (End)
* ThreadId: 3 => Item: 7 (Start)
* ThreadId: 2 => Item: 2 Sleep: 6s (End)
* ThreadId: 2 => Item: 8 (Start)
* ...
* ThreadId: 4 => Item: 44 Sleep: 6s (End)
* ThreadId: 4 => Item: 49 (Start)
* ThreadId: 3 => Item: 46 Sleep: 5s (End)
* ThreadId: 3 => Item: 50 (Start)
* ThreadId: 2 => Item: 43 Sleep: 9s (End)
* Destroy ThreadId: 2
* ThreadId: 1 => Item: 47 Sleep: 5s (End)
* Destroy ThreadId: 1
* ThreadId: 4 => Item: 49 Sleep: 7s (End)
* Destroy ThreadId: 4
* ThreadId: 5 => Item: 48 Sleep: 10s (End)
* Destroy ThreadId: 5
* ThreadId: 3 => Item: 50 Sleep: 10s (End)
* Destroy ThreadId: 3
*/

use \parallel\{Runtime, Future, Channel, Events};

// Generate list of items to process with a generator
function generator(int $item_count) {
    for (
$i=1; $i <= $item_count; $i++) {
       
yield $i;
    }
}

function
testConcurrency(int $concurrency, int $item_count) {

   
$generator = generator($item_count);

   
// Function executing in each thread. Have a snap for a random time for example !
   
$producer = function (int $item_id) {
       
$seconds = rand(1, 10);
       
sleep($seconds);
        return [
'item_id' => $item_id, 'sleep_seconds' => $seconds];
    };

   
// Fill up threads with initial 'inactive' state
   
$threads = array_fill(1, $concurrency, ['is_active' => false]);

    while (
true) {
       
// Loop through threads until all threads are finished
       
foreach ($threads as $thread_id => $thread) {
            if (!
$thread['is_active'] and $generator->valid()) {
               
// Thread is inactive and generator still has values : run something in the thread
               
$item_id = $generator->current();
               
$threads[$thread_id]['run'] = \parallel\run($producer, [$item_id]);
                echo
"ThreadId: $thread_id => Item: $item_id (Start)\n";
               
$threads[$thread_id]['is_active'] = true;
               
$generator->next();
            } elseif (!isset(
$threads[$thread_id]['run'])) {
               
// Destroy supplementary threads in case generator closes sooner than number of threads
               
echo "Destroy ThreadId: $thread_id\n";
                unset(
$threads[$thread_id]);
            } elseif (
$threads[$thread_id]['run']->done()) {
               
// Thread finished. Get results
               
$item = $threads[$thread_id]['run']->value();
                echo
"ThreadId: $thread_id => Item: {$item['item_id']} Sleep: {$item['sleep_seconds']}s (End)\n";

                if (!
$generator->valid()) {
                   
// Generator is closed then destroy thread
                   
echo "Destroy ThreadId: $thread_id\n";
                    unset(
$threads[$thread_id]);
                } else {
                   
// Thread is ready to run again
                   
$threads[$thread_id]['is_active'] = false;
                }
            }
        }

       
// Escape loop when all threads are destroyed
       
if (empty($threads)) break;
    }
}

$concurrency = 5;
$item_count = 50;

testConcurrency($concurrency, $item_count);

?>
To Top