Queue.pm 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603
  1. package Thread::Queue;
  2. use strict;
  3. use warnings;
  4. our $VERSION = '3.05';
  5. $VERSION = eval $VERSION;
  6. use threads::shared 1.21;
  7. use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
  8. # Carp errors from threads::shared calls should complain about caller
  9. our @CARP_NOT = ("threads::shared");
  10. # Create a new queue possibly pre-populated with items
  11. sub new
  12. {
  13. my $class = shift;
  14. my @queue :shared = map { shared_clone($_) } @_;
  15. my %self :shared = ( 'queue' => \@queue );
  16. return bless(\%self, $class);
  17. }
  18. # Add items to the tail of a queue
  19. sub enqueue
  20. {
  21. my $self = shift;
  22. lock(%$self);
  23. if ($$self{'ENDED'}) {
  24. require Carp;
  25. Carp::croak("'enqueue' method called on queue that has been 'end'ed");
  26. }
  27. push(@{$$self{'queue'}}, map { shared_clone($_) } @_)
  28. and cond_signal(%$self);
  29. }
  30. # Return a count of the number of items on a queue
  31. sub pending
  32. {
  33. my $self = shift;
  34. lock(%$self);
  35. return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
  36. return scalar(@{$$self{'queue'}});
  37. }
  38. # Indicate that no more data will enter the queue
  39. sub end
  40. {
  41. my $self = shift;
  42. lock $self;
  43. # No more data is coming
  44. $$self{'ENDED'} = 1;
  45. # Try to release at least one blocked thread
  46. cond_signal(%$self);
  47. }
  48. # Return 1 or more items from the head of a queue, blocking if needed
  49. sub dequeue
  50. {
  51. my $self = shift;
  52. lock(%$self);
  53. my $queue = $$self{'queue'};
  54. my $count = @_ ? $self->_validate_count(shift) : 1;
  55. # Wait for requisite number of items
  56. cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
  57. cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
  58. # If no longer blocking, try getting whatever is left on the queue
  59. return $self->dequeue_nb($count) if ($$self{'ENDED'});
  60. # Return single item
  61. return shift(@$queue) if ($count == 1);
  62. # Return multiple items
  63. my @items;
  64. push(@items, shift(@$queue)) for (1..$count);
  65. return @items;
  66. }
  67. # Return items from the head of a queue with no blocking
  68. sub dequeue_nb
  69. {
  70. my $self = shift;
  71. lock(%$self);
  72. my $queue = $$self{'queue'};
  73. my $count = @_ ? $self->_validate_count(shift) : 1;
  74. # Return single item
  75. return shift(@$queue) if ($count == 1);
  76. # Return multiple items
  77. my @items;
  78. for (1..$count) {
  79. last if (! @$queue);
  80. push(@items, shift(@$queue));
  81. }
  82. return @items;
  83. }
  84. # Return items from the head of a queue, blocking if needed up to a timeout
  85. sub dequeue_timed
  86. {
  87. my $self = shift;
  88. lock(%$self);
  89. my $queue = $$self{'queue'};
  90. # Timeout may be relative or absolute
  91. my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
  92. # Convert to an absolute time for use with cond_timedwait()
  93. if ($timeout < 32000000) { # More than one year
  94. $timeout += time();
  95. }
  96. my $count = @_ ? $self->_validate_count(shift) : 1;
  97. # Wait for requisite number of items, or until timeout
  98. while ((@$queue < $count) && ! $$self{'ENDED'}) {
  99. last if (! cond_timedwait(%$self, $timeout));
  100. }
  101. cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
  102. # Get whatever we need off the queue if available
  103. return $self->dequeue_nb($count);
  104. }
  105. # Return an item without removing it from a queue
  106. sub peek
  107. {
  108. my $self = shift;
  109. lock(%$self);
  110. my $index = @_ ? $self->_validate_index(shift) : 0;
  111. return $$self{'queue'}[$index];
  112. }
  113. # Insert items anywhere into a queue
  114. sub insert
  115. {
  116. my $self = shift;
  117. lock(%$self);
  118. if ($$self{'ENDED'}) {
  119. require Carp;
  120. Carp::croak("'insert' method called on queue that has been 'end'ed");
  121. }
  122. my $queue = $$self{'queue'};
  123. my $index = $self->_validate_index(shift);
  124. return if (! @_); # Nothing to insert
  125. # Support negative indices
  126. if ($index < 0) {
  127. $index += @$queue;
  128. if ($index < 0) {
  129. $index = 0;
  130. }
  131. }
  132. # Dequeue items from $index onward
  133. my @tmp;
  134. while (@$queue > $index) {
  135. unshift(@tmp, pop(@$queue))
  136. }
  137. # Add new items to the queue
  138. push(@$queue, map { shared_clone($_) } @_);
  139. # Add previous items back onto the queue
  140. push(@$queue, @tmp);
  141. # Soup's up
  142. cond_signal(%$self);
  143. }
  144. # Remove items from anywhere in a queue
  145. sub extract
  146. {
  147. my $self = shift;
  148. lock(%$self);
  149. my $queue = $$self{'queue'};
  150. my $index = @_ ? $self->_validate_index(shift) : 0;
  151. my $count = @_ ? $self->_validate_count(shift) : 1;
  152. # Support negative indices
  153. if ($index < 0) {
  154. $index += @$queue;
  155. if ($index < 0) {
  156. $count += $index;
  157. return if ($count <= 0); # Beyond the head of the queue
  158. return $self->dequeue_nb($count); # Extract from the head
  159. }
  160. }
  161. # Dequeue items from $index+$count onward
  162. my @tmp;
  163. while (@$queue > ($index+$count)) {
  164. unshift(@tmp, pop(@$queue))
  165. }
  166. # Extract desired items
  167. my @items;
  168. unshift(@items, pop(@$queue)) while (@$queue > $index);
  169. # Add back any removed items
  170. push(@$queue, @tmp);
  171. # Return single item
  172. return $items[0] if ($count == 1);
  173. # Return multiple items
  174. return @items;
  175. }
  176. ### Internal Methods ###
  177. # Check value of the requested index
  178. sub _validate_index
  179. {
  180. my $self = shift;
  181. my $index = shift;
  182. if (! defined($index) ||
  183. ! looks_like_number($index) ||
  184. (int($index) != $index))
  185. {
  186. require Carp;
  187. my ($method) = (caller(1))[3];
  188. my $class_name = ref($self);
  189. $method =~ s/$class_name\:://;
  190. $index = 'undef' if (! defined($index));
  191. Carp::croak("Invalid 'index' argument ($index) to '$method' method");
  192. }
  193. return $index;
  194. };
  195. # Check value of the requested count
  196. sub _validate_count
  197. {
  198. my $self = shift;
  199. my $count = shift;
  200. if (! defined($count) ||
  201. ! looks_like_number($count) ||
  202. (int($count) != $count) ||
  203. ($count < 1))
  204. {
  205. require Carp;
  206. my ($method) = (caller(1))[3];
  207. my $class_name = ref($self);
  208. $method =~ s/$class_name\:://;
  209. $count = 'undef' if (! defined($count));
  210. Carp::croak("Invalid 'count' argument ($count) to '$method' method");
  211. }
  212. return $count;
  213. };
  214. # Check value of the requested timeout
  215. sub _validate_timeout
  216. {
  217. my $self = shift;
  218. my $timeout = shift;
  219. if (! defined($timeout) ||
  220. ! looks_like_number($timeout))
  221. {
  222. require Carp;
  223. my ($method) = (caller(1))[3];
  224. my $class_name = ref($self);
  225. $method =~ s/$class_name\:://;
  226. $timeout = 'undef' if (! defined($timeout));
  227. Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
  228. }
  229. return $timeout;
  230. };
  231. 1;
  232. =head1 NAME
  233. Thread::Queue - Thread-safe queues
  234. =head1 VERSION
  235. This document describes Thread::Queue version 3.05
  236. =head1 SYNOPSIS
  237. use strict;
  238. use warnings;
  239. use threads;
  240. use Thread::Queue;
  241. my $q = Thread::Queue->new(); # A new empty queue
  242. # Worker thread
  243. my $thr = threads->create(
  244. sub {
  245. # Thread will loop until no more work
  246. while (defined(my $item = $q->dequeue())) {
  247. # Do work on $item
  248. ...
  249. }
  250. }
  251. );
  252. # Send work to the thread
  253. $q->enqueue($item1, ...);
  254. # Signal that there is no more work to be sent
  255. $q->end();
  256. # Join up with the thread when it finishes
  257. $thr->join();
  258. ...
  259. # Count of items in the queue
  260. my $left = $q->pending();
  261. # Non-blocking dequeue
  262. if (defined(my $item = $q->dequeue_nb())) {
  263. # Work on $item
  264. }
  265. # Blocking dequeue with 5-second timeout
  266. if (defined(my $item = $q->dequeue_timed(5))) {
  267. # Work on $item
  268. }
  269. # Get the second item in the queue without dequeuing anything
  270. my $item = $q->peek(1);
  271. # Insert two items into the queue just behind the head
  272. $q->insert(1, $item1, $item2);
  273. # Extract the last two items on the queue
  274. my ($item1, $item2) = $q->extract(-2, 2);
  275. =head1 DESCRIPTION
  276. This module provides thread-safe FIFO queues that can be accessed safely by
  277. any number of threads.
  278. Any data types supported by L<threads::shared> can be passed via queues:
  279. =over
  280. =item Ordinary scalars
  281. =item Array refs
  282. =item Hash refs
  283. =item Scalar refs
  284. =item Objects based on the above
  285. =back
  286. Ordinary scalars are added to queues as they are.
  287. If not already thread-shared, the other complex data types will be cloned
  288. (recursively, if needed, and including any C<bless>ings and read-only
  289. settings) into thread-shared structures before being placed onto a queue.
  290. For example, the following would cause L<Thread::Queue> to create a empty,
  291. shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
  292. and 'baz' from C<@ary> into it, and then place that shared reference onto
  293. the queue:
  294. my @ary = qw/foo bar baz/;
  295. $q->enqueue(\@ary);
  296. However, for the following, the items are already shared, so their references
  297. are added directly to the queue, and no cloning takes place:
  298. my @ary :shared = qw/foo bar baz/;
  299. $q->enqueue(\@ary);
  300. my $obj = &shared({});
  301. $$obj{'foo'} = 'bar';
  302. $$obj{'qux'} = 99;
  303. bless($obj, 'My::Class');
  304. $q->enqueue($obj);
  305. See L</"LIMITATIONS"> for caveats related to passing objects via queues.
  306. =head1 QUEUE CREATION
  307. =over
  308. =item ->new()
  309. Creates a new empty queue.
  310. =item ->new(LIST)
  311. Creates a new queue pre-populated with the provided list of items.
  312. =back
  313. =head1 BASIC METHODS
  314. The following methods deal with queues on a FIFO basis.
  315. =over
  316. =item ->enqueue(LIST)
  317. Adds a list of items onto the end of the queue.
  318. =item ->dequeue()
  319. =item ->dequeue(COUNT)
  320. Removes the requested number of items (default is 1) from the head of the
  321. queue, and returns them. If the queue contains fewer than the requested
  322. number of items, then the thread will be blocked until the requisite number
  323. of items are available (i.e., until other threads <enqueue> more items).
  324. =item ->dequeue_nb()
  325. =item ->dequeue_nb(COUNT)
  326. Removes the requested number of items (default is 1) from the head of the
  327. queue, and returns them. If the queue contains fewer than the requested
  328. number of items, then it immediately (i.e., non-blocking) returns whatever
  329. items there are on the queue. If the queue is empty, then C<undef> is
  330. returned.
  331. =item ->dequeue_timed(TIMEOUT)
  332. =item ->dequeue_timed(TIMEOUT, COUNT)
  333. Removes the requested number of items (default is 1) from the head of the
  334. queue, and returns them. If the queue contains fewer than the requested
  335. number of items, then the thread will be blocked until the requisite number of
  336. items are available, or until the timeout is reached. If the timeout is
  337. reached, it returns whatever items there are on the queue, or C<undef> if the
  338. queue is empty.
  339. The timeout may be a number of seconds relative to the current time (e.g., 5
  340. seconds from when the call is made), or may be an absolute timeout in I<epoch>
  341. seconds the same as would be used with
  342. L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">.
  343. Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
  344. the underlying implementation).
  345. If C<TIMEOUT> is missing, C<undef>, or less than or equal to 0, then this call
  346. behaves the same as C<dequeue_nb>.
  347. =item ->pending()
  348. Returns the number of items still in the queue. Returns C<undef> if the queue
  349. has been ended (see below), and there are no more items in the queue.
  350. =item ->end()
  351. Declares that no more items will be added to the queue.
  352. All threads blocking on C<dequeue()> calls will be unblocked with any
  353. remaining items in the queue and/or C<undef> being returned. Any subsequent
  354. calls to C<dequeue()> will behave like C<dequeue_nb()>.
  355. Once ended, no more items may be placed in the queue.
  356. =back
  357. =head1 ADVANCED METHODS
  358. The following methods can be used to manipulate items anywhere in a queue.
  359. To prevent the contents of a queue from being modified by another thread
  360. while it is being examined and/or changed, L<lock|threads::shared/"lock
  361. VARIABLE"> the queue inside a local block:
  362. {
  363. lock($q); # Keep other threads from changing the queue's contents
  364. my $item = $q->peek();
  365. if ($item ...) {
  366. ...
  367. }
  368. }
  369. # Queue is now unlocked
  370. =over
  371. =item ->peek()
  372. =item ->peek(INDEX)
  373. Returns an item from the queue without dequeuing anything. Defaults to the
  374. the head of queue (at index position 0) if no index is specified. Negative
  375. index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
  376. is the end of the queue, -2 is next to last, and so on).
  377. If no items exists at the specified index (i.e., the queue is empty, or the
  378. index is beyond the number of items on the queue), then C<undef> is returned.
  379. Remember, the returned item is not removed from the queue, so manipulating a
  380. C<peek>ed at reference affects the item on the queue.
  381. =item ->insert(INDEX, LIST)
  382. Adds the list of items to the queue at the specified index position (0
  383. is the head of the list). Any existing items at and beyond that position are
  384. pushed back past the newly added items:
  385. $q->enqueue(1, 2, 3, 4);
  386. $q->insert(1, qw/foo bar/);
  387. # Queue now contains: 1, foo, bar, 2, 3, 4
  388. Specifying an index position greater than the number of items in the queue
  389. just adds the list to the end.
  390. Negative index positions are supported:
  391. $q->enqueue(1, 2, 3, 4);
  392. $q->insert(-2, qw/foo bar/);
  393. # Queue now contains: 1, 2, foo, bar, 3, 4
  394. Specifying a negative index position greater than the number of items in the
  395. queue adds the list to the head of the queue.
  396. =item ->extract()
  397. =item ->extract(INDEX)
  398. =item ->extract(INDEX, COUNT)
  399. Removes and returns the specified number of items (defaults to 1) from the
  400. specified index position in the queue (0 is the head of the queue). When
  401. called with no arguments, C<extract> operates the same as C<dequeue_nb>.
  402. This method is non-blocking, and will return only as many items as are
  403. available to fulfill the request:
  404. $q->enqueue(1, 2, 3, 4);
  405. my $item = $q->extract(2) # Returns 3
  406. # Queue now contains: 1, 2, 4
  407. my @items = $q->extract(1, 3) # Returns (2, 4)
  408. # Queue now contains: 1
  409. Specifying an index position greater than the number of items in the
  410. queue results in C<undef> or an empty list being returned.
  411. $q->enqueue('foo');
  412. my $nada = $q->extract(3) # Returns undef
  413. my @nada = $q->extract(1, 3) # Returns ()
  414. Negative index positions are supported. Specifying a negative index position
  415. greater than the number of items in the queue may return items from the head
  416. of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
  417. queue from the specified position (i.e. if queue size + index + count is
  418. greater than zero):
  419. $q->enqueue(qw/foo bar baz/);
  420. my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
  421. my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
  422. # Queue now contains: bar, baz
  423. my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
  424. =back
  425. =head1 NOTES
  426. Queues created by L<Thread::Queue> can be used in both threaded and
  427. non-threaded applications.
  428. =head1 LIMITATIONS
  429. Passing objects on queues may not work if the objects' classes do not support
  430. sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
  431. Passing array/hash refs that contain objects may not work for Perl prior to
  432. 5.10.0.
  433. =head1 SEE ALSO
  434. Thread::Queue Discussion Forum on CPAN:
  435. L<http://www.cpanforum.com/dist/Thread-Queue>
  436. L<threads>, L<threads::shared>
  437. Sample code in the I<examples> directory of this distribution on CPAN.
  438. =head1 MAINTAINER
  439. Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
  440. =head1 LICENSE
  441. This program is free software; you can redistribute it and/or modify it under
  442. the same terms as Perl itself.
  443. =cut