Skip to content

k3priorityqueue

Action-CI Documentation Status Package

Priority queue with weighted producer support.

k3priorityqueue is a component of pykit3 project: a python3 toolkit set.

Installation

pip install k3priorityqueue

Quick Start

PriorityQueue pops items from producers according to their priority weights. If producers A, B, C have priorities 1, 3, 7, items are popped in that ratio:

import k3priorityqueue

producers = (
    # id, priority, iterable
    (1, 1, [1] * 10),
    (2, 2, [2] * 10),
    (3, 3, [3] * 10),
)

pq = k3priorityqueue.PriorityQueue()

for pid, prio, itr in producers:
    pq.add_producer(pid, prio, itr)

count = {}
for _ in range(12):
    val = pq.get()
    count[val] = count.get(val, 0) + 1

print('counts:', repr(count))
# Output shows ratio approximately 1:2:3

API Reference

k3priorityqueue

PriorityQueue is a queue with priority support:

The numbers of items it pops from each producer matches exactly the ratio of their priority: If the priorities of 3 producer A, B and C are 1, 3 and 7, and it runs long enough, it is expected that the number of items popped from A, B and C are 1:3:7.

import k3priorityqueue

producers = ( # id, priority, iterable (1, 1, [1] * 10), (2, 2, [2] * 10), (3, 3, [3] * 10),

)

pq = k3priorityqueue.PriorityQueue()

for pid, prio, itr in producers: pq.add_producer(pid, prio, itr)

count = {}

for _ in range(12): val = pq.get() count[val] = count.get(val, 0) + 1 print(val)

print('respect priority ratio: counts:', repr(count))

while True

try: val = pq.get() except k3priorityqueue.Empty as e: break

count[val] = count.get(val, 0) + 1 print(val)

print('consumed all: counts:', repr(count))

PriorityQueue

Bases: object

A queue managing several Producer instances. It produces items by Producer.priority.

Internally, there are two heap to store producers. One of them for all consumable producers, the other is for all empty producers.

When PriorityQueue.get() is called and it found that a producer becomes empty, it remove it from the consumable heap and put it into the empty producer heap and will never try to get an item from it again.

To re-enable a producer, call PriorityQueue.add_producer() with the same producer_id.

Source code in k3priorityqueue/priorityqueue.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
class PriorityQueue(object):
    """
    A queue managing several `Producer` instances.
    It produces items by `Producer.priority`.

    Internally, there are two heap to store producers.
    One of them for all consumable producers, the other is for all empty producers.

    When `PriorityQueue.get()` is called and it found that a producer becomes empty,
    it remove it from the consumable heap and put it into the empty producer heap
    and will never try to get an item from it again.

    To re-enable a producer, call `PriorityQueue.add_producer()` with the same
    `producer_id`.
    """

    def __init__(self):
        self.producer_by_id = {}

        # empty_heap: stores all empty Producer.
        #             Empty produer means it has raised an Empty exception when
        #             calling Producer.get().
        #             `Empty` exception raised means it has no more item to
        #             produce.
        #
        # consumable_heap: stores all non-empty Producer, less consumed
        #             Producer is at high position in heap.
        self.empty_heap = k3heap.RefHeap()
        self.consumable_heap = k3heap.RefHeap()

        self.heap_lock = threading.RLock()

    def add_producer(self, producer_id, priority, iterable):
        """
        Add a new producer or reset an existent producer.
            add_producer(int,float,iter)
        :arg
            producer_id: is provided as identity of a producer.

            priority: specifies the priority of this producer, priority also acts as the weight of
                      item to produce.
            iterable: is an producer implementation: it could be anything that can be used in a
                       `for-in` loop, such as `[1, 2, 3]`, or `range(10)`.
        """
        with self.heap_lock:
            if producer_id not in self.producer_by_id:
                p = Producer(producer_id, priority, iterable)
                self.producer_by_id[producer_id] = p
            else:
                # if exist, update its priority and iterable.
                p = self.producer_by_id[producer_id]
                p.set_priority(priority)
                p.set_iterable(iterable)

            # Every time add a (may be existent) queue, treat it as consumable
            self._remove_from_heaps(p)
            self.consumable_heap.push(p)

    def remove_producer(self, producer_id, ignore_not_found=False):
        """
        Remove a producer by its id.
        remove_producer(int,bool)
        Args:
            producer_id: specifies the id of a producer to remove.
            ignore_not_found: if it is `False`, raies a `KeyError` when such a `producer_id` not fou
            Defaults to `False`
        """
        with self.heap_lock:
            if producer_id not in self.producer_by_id and ignore_not_found:
                return

            p = self.producer_by_id[producer_id]
            self._remove_from_heaps(p)

            del self.producer_by_id[producer_id]

    def _remove_from_heaps(self, producer):
        try:
            self.empty_heap.remove(producer)
        except k3heap.NotFound:
            pass

        try:
            self.consumable_heap.remove(producer)
        except k3heap.NotFound:
            pass

    def get(self):
        """
        Returns an item.
        """
        while True:
            with self.heap_lock:
                try:
                    p = self.consumable_heap.get()
                except k3heap.Empty:
                    raise Empty("no more queue has any item")

                try:
                    # NOTE: if p.iterable blocks, everything is blocked
                    val = p.get()

                except Empty:
                    self.consumable_heap.remove(p)
                    self.empty_heap.push(p)

                    # try next consumable queue
                    continue

                self.consumable_heap.sift(p)

                return val

    def __str__(self):
        qs = []
        for cq in self.producer_by_id.values():
            qs.append(str(cq))

        return " ".join(qs)

add_producer(producer_id, priority, iterable)

Add a new producer or reset an existent producer. add_producer(int,float,iter) :arg producer_id: is provided as identity of a producer.

priority: specifies the priority of this producer, priority also acts as the weight of
          item to produce.
iterable: is an producer implementation: it could be anything that can be used in a
           `for-in` loop, such as `[1, 2, 3]`, or `range(10)`.
Source code in k3priorityqueue/priorityqueue.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def add_producer(self, producer_id, priority, iterable):
    """
    Add a new producer or reset an existent producer.
        add_producer(int,float,iter)
    :arg
        producer_id: is provided as identity of a producer.

        priority: specifies the priority of this producer, priority also acts as the weight of
                  item to produce.
        iterable: is an producer implementation: it could be anything that can be used in a
                   `for-in` loop, such as `[1, 2, 3]`, or `range(10)`.
    """
    with self.heap_lock:
        if producer_id not in self.producer_by_id:
            p = Producer(producer_id, priority, iterable)
            self.producer_by_id[producer_id] = p
        else:
            # if exist, update its priority and iterable.
            p = self.producer_by_id[producer_id]
            p.set_priority(priority)
            p.set_iterable(iterable)

        # Every time add a (may be existent) queue, treat it as consumable
        self._remove_from_heaps(p)
        self.consumable_heap.push(p)

get()

Returns an item.

Source code in k3priorityqueue/priorityqueue.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
def get(self):
    """
    Returns an item.
    """
    while True:
        with self.heap_lock:
            try:
                p = self.consumable_heap.get()
            except k3heap.Empty:
                raise Empty("no more queue has any item")

            try:
                # NOTE: if p.iterable blocks, everything is blocked
                val = p.get()

            except Empty:
                self.consumable_heap.remove(p)
                self.empty_heap.push(p)

                # try next consumable queue
                continue

            self.consumable_heap.sift(p)

            return val

remove_producer(producer_id, ignore_not_found=False)

Remove a producer by its id. remove_producer(int,bool) Args: producer_id: specifies the id of a producer to remove. ignore_not_found: if it is False, raies a KeyError when such a producer_id not fou Defaults to False

Source code in k3priorityqueue/priorityqueue.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def remove_producer(self, producer_id, ignore_not_found=False):
    """
    Remove a producer by its id.
    remove_producer(int,bool)
    Args:
        producer_id: specifies the id of a producer to remove.
        ignore_not_found: if it is `False`, raies a `KeyError` when such a `producer_id` not fou
        Defaults to `False`
    """
    with self.heap_lock:
        if producer_id not in self.producer_by_id and ignore_not_found:
            return

        p = self.producer_by_id[producer_id]
        self._remove_from_heaps(p)

        del self.producer_by_id[producer_id]

Producer

Bases: object

An internal class which tracks consumption state. It provides with a get() method to retrieve and item from it. It has an attribute priority to specify its priority.

A Producer instance is able to compare to another with operator <:

  • a<b is defined by: a is less consumed and would cost less for each consumption: The comparison key is: (1/priority * nr_of_get, 1/priority).

Thus a smaller Producer means it is less consumed and should be consumed first. Attributes: get(): Returns an item. set_priority(float): Set producer priority set_iterable(set_iterable): Set producer iterable

Source code in k3priorityqueue/priorityqueue.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class Producer(object):
    """
    An internal class which tracks consumption state.
    It provides with a `get()` method to retrieve and item from it.
    It has an attribute `priority` to specify its priority.

    A `Producer` instance is able to compare to another with operator `<`:

    -   `a<b` is defined by: a is less consumed and would cost less for each
        consumption:
        The comparison key is: `(1/priority * nr_of_get, 1/priority)`.

    Thus a smaller `Producer` means it is less consumed and should be consumed first.
    Attributes:
        get():                      Returns an item.
        set_priority(float):        Set producer priority
        set_iterable(set_iterable): Set producer iterable
    """

    def __init__(self, producer_id, priority, iterable):
        self.consumed = 0
        self.iterable_lock = threading.RLock()
        self.stat = {"get": 0}
        self.cmp_key = (0, 0)

        self.producer_id = producer_id
        self.set_priority(priority)
        self.set_iterable(iterable)

    def get(self):
        with self.iterable_lock:
            try:
                val = next(self.iterable)
                self.stat["get"] += 1
                self.consume()
                return val
            except StopIteration:
                raise Empty("no more item in " + str(self))

    def set_priority(self, priority):
        priority = float(priority)

        if priority <= 0:
            raise ValueError("priority can not be less or euqal 0: " + str(priority))

        self.priority = priority
        self.item_cost = default_priority / float(self.priority)
        self.cmp_key = (self.consumed, self.item_cost)

    def set_iterable(self, iterable):
        self.iterable = iter(iterable)

    def consume(self):
        self.consumed += self.item_cost
        self.cmp_key = (self.consumed, self.item_cost)

    def __str__(self):
        return "[{producer_id}={priority} c={consumed}]".format(
            producer_id=self.producer_id,
            priority=self.priority,
            consumed=self.consumed,
        )

    def __lt__(self, b):
        return self.cmp_key < b.cmp_key

License

The MIT License (MIT) - Copyright (c) 2015 Zhang Yanpo (张炎泼)