Skip to content

Commit 8b4b880

Browse files
committed
feat: implement Circular Queue and add tests
1 parent 000dc9d commit 8b4b880

File tree

4 files changed

+535
-16
lines changed

4 files changed

+535
-16
lines changed

blocking.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ type Blocking[T comparable] struct {
2525
capacity *int
2626

2727
// synchronization
28-
lock sync.Mutex
28+
lock sync.RWMutex
2929
notEmptyCond *sync.Cond
3030
notFullCond *sync.Cond
3131
}
@@ -48,7 +48,7 @@ func NewBlocking[T comparable](
4848
elementsIndex: 0,
4949
initialLen: len(elems),
5050
capacity: options.capacity,
51-
lock: sync.Mutex{},
51+
lock: sync.RWMutex{},
5252
}
5353

5454
queue.notEmptyCond = sync.NewCond(&queue.lock)
@@ -163,8 +163,8 @@ func (bq *Blocking[T]) Clear() []T {
163163
// Iterator returns an iterator over the elements in the queue.
164164
// It removes the elements from the queue.
165165
func (bq *Blocking[T]) Iterator() <-chan T {
166-
bq.lock.Lock()
167-
defer bq.lock.Unlock()
166+
bq.lock.RLock()
167+
defer bq.lock.RUnlock()
168168

169169
// use a buffered channel to avoid blocking the iterator.
170170
iteratorCh := make(chan T, bq.size())
@@ -190,8 +190,8 @@ func (bq *Blocking[T]) Iterator() <-chan T {
190190
// Peek retrieves but does not return the head of the queue.
191191
// If no element is available it returns an ErrNoElementsAvailable error.
192192
func (bq *Blocking[T]) Peek() (v T, _ error) {
193-
bq.lock.Lock()
194-
defer bq.lock.Unlock()
193+
bq.lock.RLock()
194+
defer bq.lock.RUnlock()
195195

196196
if bq.isEmpty() {
197197
return v, ErrNoElementsAvailable
@@ -223,16 +223,16 @@ func (bq *Blocking[T]) PeekWait() T {
223223

224224
// Size returns the number of elements in the queue.
225225
func (bq *Blocking[T]) Size() int {
226-
bq.lock.Lock()
227-
defer bq.lock.Unlock()
226+
bq.lock.RLock()
227+
defer bq.lock.RUnlock()
228228

229229
return bq.size()
230230
}
231231

232232
// Contains returns true if the queue contains the given element.
233233
func (bq *Blocking[T]) Contains(elem T) bool {
234-
bq.lock.Lock()
235-
defer bq.lock.Unlock()
234+
bq.lock.RLock()
235+
defer bq.lock.RUnlock()
236236

237237
for i := range bq.elements[bq.elementsIndex:] {
238238
if bq.elements[i] == elem {
@@ -245,8 +245,8 @@ func (bq *Blocking[T]) Contains(elem T) bool {
245245

246246
// IsEmpty returns true if the queue is empty.
247247
func (bq *Blocking[T]) IsEmpty() bool {
248-
bq.lock.Lock()
249-
defer bq.lock.Unlock()
248+
bq.lock.RLock()
249+
defer bq.lock.RUnlock()
250250

251251
return bq.isEmpty()
252252
}

circular.go

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
package queue
2+
3+
import (
4+
"sync"
5+
)
6+
7+
// Ensure Priority implements the Queue interface.
8+
var _ Queue[any] = (*Circular[any])(nil)
9+
10+
// Circular is a Queue implementation.
11+
// A circular queue is a queue that uses a fixed-size slice as if it were connected end-to-end.
12+
// When the queue is full, adding a new element to the queue overwrites the oldest element.
13+
type Circular[T comparable] struct {
14+
initialElements []T
15+
elems []T
16+
head int
17+
tail int
18+
size int
19+
20+
// synchronization
21+
lock sync.RWMutex
22+
}
23+
24+
// NewCircular creates a new Circular Queue containing the given elements.
25+
func NewCircular[T comparable](
26+
givenElems []T,
27+
capacity int,
28+
opts ...Option,
29+
) *Circular[T] {
30+
options := options{
31+
capacity: &capacity,
32+
}
33+
34+
for _, o := range opts {
35+
o.apply(&options)
36+
}
37+
38+
elems := make([]T, *options.capacity)
39+
40+
copy(elems, givenElems)
41+
42+
initialElems := make([]T, len(givenElems))
43+
44+
copy(initialElems, givenElems)
45+
46+
tail := 0
47+
48+
size := len(elems)
49+
50+
if len(initialElems) < len(elems) {
51+
tail = len(initialElems)
52+
size = len(initialElems)
53+
}
54+
55+
return &Circular[T]{
56+
initialElements: initialElems,
57+
elems: elems,
58+
head: 0,
59+
tail: tail,
60+
size: size,
61+
lock: sync.RWMutex{},
62+
}
63+
}
64+
65+
// ==================================Insertion=================================
66+
67+
// Offer adds an element into the queue.
68+
// If the queue is full then the oldest item is overwritten.
69+
func (q *Circular[T]) Offer(item T) error {
70+
q.lock.Lock()
71+
defer q.lock.Unlock()
72+
73+
if q.size < len(q.elems) {
74+
q.size++
75+
}
76+
77+
q.elems[q.tail] = item
78+
q.tail = (q.tail + 1) % len(q.elems)
79+
80+
return nil
81+
}
82+
83+
// Reset resets the queue to its initial state.
84+
func (q *Circular[T]) Reset() {
85+
q.lock.Lock()
86+
defer q.lock.Unlock()
87+
88+
copy(q.elems, q.initialElements)
89+
90+
q.head = 0
91+
q.tail = 0
92+
q.size = len(q.initialElements)
93+
94+
if len(q.initialElements) < len(q.elems) {
95+
q.tail = len(q.initialElements)
96+
}
97+
}
98+
99+
// ===================================Removal==================================
100+
101+
// Get returns the element at the head of the queue.
102+
func (q *Circular[T]) Get() (v T, _ error) {
103+
q.lock.Lock()
104+
defer q.lock.Unlock()
105+
106+
return q.get()
107+
}
108+
109+
// Clear removes all elements from the queue.
110+
func (q *Circular[T]) Clear() []T {
111+
q.lock.Lock()
112+
defer q.lock.Unlock()
113+
114+
elems := make([]T, 0, q.size)
115+
116+
for {
117+
elem, err := q.get()
118+
if err != nil {
119+
break
120+
}
121+
122+
elems = append(elems, elem)
123+
}
124+
125+
// clear the queue
126+
q.head = 0
127+
q.tail = 0
128+
129+
return elems
130+
}
131+
132+
// Iterator returns an iterator over the elements in the queue.
133+
// It removes the elements from the queue.
134+
func (q *Circular[T]) Iterator() <-chan T {
135+
q.lock.RLock()
136+
defer q.lock.RUnlock()
137+
138+
// use a buffered channel to avoid blocking the iterator.
139+
iteratorCh := make(chan T, q.size)
140+
141+
// close the channel when the function returns.
142+
defer close(iteratorCh)
143+
144+
// iterate over the elements and send them to the channel.
145+
for {
146+
elem, err := q.get()
147+
if err != nil {
148+
break
149+
}
150+
151+
iteratorCh <- elem
152+
}
153+
154+
return iteratorCh
155+
}
156+
157+
// =================================Examination================================
158+
159+
// IsEmpty returns true if the queue is empty.
160+
func (q *Circular[T]) IsEmpty() bool {
161+
q.lock.RLock()
162+
defer q.lock.RUnlock()
163+
164+
return q.isEmpty()
165+
}
166+
167+
// Contains returns true if the queue contains the given element.
168+
func (q *Circular[T]) Contains(elem T) bool {
169+
q.lock.RLock()
170+
defer q.lock.RUnlock()
171+
172+
if q.isEmpty() {
173+
return false // queue is empty, item not found
174+
}
175+
176+
for i := q.head; i < q.size; i++ {
177+
idx := (q.head + i) % len(q.elems)
178+
179+
if q.elems[idx] == elem {
180+
return true // item found
181+
}
182+
}
183+
184+
return false // item not found
185+
}
186+
187+
// Peek returns the element at the head of the queue.
188+
func (q *Circular[T]) Peek() (v T, _ error) {
189+
q.lock.RLock()
190+
defer q.lock.RUnlock()
191+
192+
if q.isEmpty() {
193+
return v, ErrNoElementsAvailable
194+
}
195+
196+
return q.elems[q.head], nil
197+
}
198+
199+
// Size returns the number of elements in the queue.
200+
func (q *Circular[T]) Size() int {
201+
q.lock.RLock()
202+
defer q.lock.RUnlock()
203+
204+
return q.size
205+
}
206+
207+
// ===================================Helpers==================================
208+
209+
// Get returns the element at the head of the queue.
210+
func (q *Circular[T]) get() (v T, _ error) {
211+
if q.isEmpty() {
212+
return v, ErrNoElementsAvailable
213+
}
214+
215+
item := q.elems[q.head]
216+
q.head = (q.head + 1) % len(q.elems)
217+
q.size--
218+
219+
return item, nil
220+
}
221+
222+
func (q *Circular[T]) isEmpty() bool {
223+
return q.size == 0
224+
}

0 commit comments

Comments
 (0)