Welcome to mirror list, hosted at ThFree Co, Russian Federation.

CommandPool.php « src « aws-sdk-php « aws - github.com/nextcloud/3rdparty.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
blob: 2a101ee5bb4a005e2cbfd796a9f7bd58813d6808 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
<?php
namespace Aws;

use GuzzleHttp\Promise\PromisorInterface;
use GuzzleHttp\Promise\EachPromise;

/**
 * Sends and iterator of commands concurrently using a capped pool size.
 *
 * The pool will read command objects from an iterator until it is cancelled or
 * until the iterator is consumed.
 */
class CommandPool implements PromisorInterface
{
    /** @var EachPromise */
    private $each;

    /**
     * The CommandPool constructor accepts a hash of configuration options:
     *
     * - concurrency: (callable|int) Maximum number of commands to execute
     *   concurrently. Provide a function to resize the pool dynamically. The
     *   function will be provided the current number of pending requests and
     *   is expected to return an integer representing the new pool size limit.
     * - before: (callable) function to invoke before sending each command. The
     *   before function accepts the command and the key of the iterator of the
     *   command. You can mutate the command as needed in the before function
     *   before sending the command.
     * - fulfilled: (callable) Function to invoke when a promise is fulfilled.
     *   The function is provided the result object, id of the iterator that the
     *   result came from, and the aggregate promise that can be resolved/rejected
     *   if you need to short-circuit the pool.
     * - rejected: (callable) Function to invoke when a promise is rejected.
     *   The function is provided an AwsException object, id of the iterator that
     *   the exception came from, and the aggregate promise that can be
     *   resolved/rejected if you need to short-circuit the pool.
     * - preserve_iterator_keys: (bool) Retain the iterator key when generating
     *   the commands.
     *
     * @param AwsClientInterface $client   Client used to execute commands.
     * @param array|\Iterator    $commands Iterable that yields commands.
     * @param array              $config   Associative array of options.
     */
    public function __construct(
        AwsClientInterface $client,
        $commands,
        array $config = []
    ) {
        if (!isset($config['concurrency'])) {
            $config['concurrency'] = 25;
        }

        $before = $this->getBefore($config);
        $mapFn = function ($commands) use ($client, $before, $config) {
            foreach ($commands as $key => $command) {
                if (!($command instanceof CommandInterface)) {
                    throw new \InvalidArgumentException('Each value yielded by '
                        . 'the iterator must be an Aws\CommandInterface.');
                }
                if ($before) {
                    $before($command, $key);
                }
                if (!empty($config['preserve_iterator_keys'])) {
                    yield $key => $client->executeAsync($command);
                } else {
                    yield $client->executeAsync($command);
                }
            }
        };

        $this->each = new EachPromise($mapFn($commands), $config);
    }

    /**
     * @return \GuzzleHttp\Promise\PromiseInterface
     */
    public function promise()
    {
        return $this->each->promise();
    }

    /**
     * Executes a pool synchronously and aggregates the results of the pool
     * into an indexed array in the same order as the passed in array.
     *
     * @param AwsClientInterface $client   Client used to execute commands.
     * @param mixed              $commands Iterable that yields commands.
     * @param array              $config   Configuration options.
     *
     * @return array
     * @see \Aws\CommandPool::__construct for available configuration options.
     */
    public static function batch(
        AwsClientInterface $client,
        $commands,
        array $config = []
    ) {
        $results = [];
        self::cmpCallback($config, 'fulfilled', $results);
        self::cmpCallback($config, 'rejected', $results);

        return (new self($client, $commands, $config))
            ->promise()
            ->then(static function () use (&$results) {
                ksort($results);
                return $results;
            })
            ->wait();
    }

    /**
     * @return callable
     */
    private function getBefore(array $config)
    {
        if (!isset($config['before'])) {
            return null;
        }

        if (is_callable($config['before'])) {
            return $config['before'];
        }

        throw new \InvalidArgumentException('before must be callable');
    }

    /**
     * Adds an onFulfilled or onRejected callback that aggregates results into
     * an array. If a callback is already present, it is replaced with the
     * composed function.
     *
     * @param array $config
     * @param       $name
     * @param array $results
     */
    private static function cmpCallback(array &$config, $name, array &$results)
    {
        if (!isset($config[$name])) {
            $config[$name] = function ($v, $k) use (&$results) {
                $results[$k] = $v;
            };
        } else {
            $currentFn = $config[$name];
            $config[$name] = function ($v, $k) use (&$results, $currentFn) {
                $currentFn($v, $k);
                $results[$k] = $v;
            };
        }
    }
}