123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603 |
- package Thread::Queue;
- use strict;
- use warnings;
- our $VERSION = '3.05';
- $VERSION = eval $VERSION;
- use threads::shared 1.21;
- use Scalar::Util 1.10 qw(looks_like_number blessed reftype refaddr);
- # Carp errors from threads::shared calls should complain about caller
- our @CARP_NOT = ("threads::shared");
- # Create a new queue possibly pre-populated with items
- sub new
- {
- my $class = shift;
- my @queue :shared = map { shared_clone($_) } @_;
- my %self :shared = ( 'queue' => \@queue );
- return bless(\%self, $class);
- }
- # Add items to the tail of a queue
- sub enqueue
- {
- my $self = shift;
- lock(%$self);
- if ($$self{'ENDED'}) {
- require Carp;
- Carp::croak("'enqueue' method called on queue that has been 'end'ed");
- }
- push(@{$$self{'queue'}}, map { shared_clone($_) } @_)
- and cond_signal(%$self);
- }
- # Return a count of the number of items on a queue
- sub pending
- {
- my $self = shift;
- lock(%$self);
- return if ($$self{'ENDED'} && ! @{$$self{'queue'}});
- return scalar(@{$$self{'queue'}});
- }
- # Indicate that no more data will enter the queue
- sub end
- {
- my $self = shift;
- lock $self;
- # No more data is coming
- $$self{'ENDED'} = 1;
- # Try to release at least one blocked thread
- cond_signal(%$self);
- }
- # Return 1 or more items from the head of a queue, blocking if needed
- sub dequeue
- {
- my $self = shift;
- lock(%$self);
- my $queue = $$self{'queue'};
- my $count = @_ ? $self->_validate_count(shift) : 1;
- # Wait for requisite number of items
- cond_wait(%$self) while ((@$queue < $count) && ! $$self{'ENDED'});
- cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
- # If no longer blocking, try getting whatever is left on the queue
- return $self->dequeue_nb($count) if ($$self{'ENDED'});
- # Return single item
- return shift(@$queue) if ($count == 1);
- # Return multiple items
- my @items;
- push(@items, shift(@$queue)) for (1..$count);
- return @items;
- }
- # Return items from the head of a queue with no blocking
- sub dequeue_nb
- {
- my $self = shift;
- lock(%$self);
- my $queue = $$self{'queue'};
- my $count = @_ ? $self->_validate_count(shift) : 1;
- # Return single item
- return shift(@$queue) if ($count == 1);
- # Return multiple items
- my @items;
- for (1..$count) {
- last if (! @$queue);
- push(@items, shift(@$queue));
- }
- return @items;
- }
- # Return items from the head of a queue, blocking if needed up to a timeout
- sub dequeue_timed
- {
- my $self = shift;
- lock(%$self);
- my $queue = $$self{'queue'};
- # Timeout may be relative or absolute
- my $timeout = @_ ? $self->_validate_timeout(shift) : -1;
- # Convert to an absolute time for use with cond_timedwait()
- if ($timeout < 32000000) { # More than one year
- $timeout += time();
- }
- my $count = @_ ? $self->_validate_count(shift) : 1;
- # Wait for requisite number of items, or until timeout
- while ((@$queue < $count) && ! $$self{'ENDED'}) {
- last if (! cond_timedwait(%$self, $timeout));
- }
- cond_signal(%$self) if ((@$queue > $count) || $$self{'ENDED'});
- # Get whatever we need off the queue if available
- return $self->dequeue_nb($count);
- }
- # Return an item without removing it from a queue
- sub peek
- {
- my $self = shift;
- lock(%$self);
- my $index = @_ ? $self->_validate_index(shift) : 0;
- return $$self{'queue'}[$index];
- }
- # Insert items anywhere into a queue
- sub insert
- {
- my $self = shift;
- lock(%$self);
- if ($$self{'ENDED'}) {
- require Carp;
- Carp::croak("'insert' method called on queue that has been 'end'ed");
- }
- my $queue = $$self{'queue'};
- my $index = $self->_validate_index(shift);
- return if (! @_); # Nothing to insert
- # Support negative indices
- if ($index < 0) {
- $index += @$queue;
- if ($index < 0) {
- $index = 0;
- }
- }
- # Dequeue items from $index onward
- my @tmp;
- while (@$queue > $index) {
- unshift(@tmp, pop(@$queue))
- }
- # Add new items to the queue
- push(@$queue, map { shared_clone($_) } @_);
- # Add previous items back onto the queue
- push(@$queue, @tmp);
- # Soup's up
- cond_signal(%$self);
- }
- # Remove items from anywhere in a queue
- sub extract
- {
- my $self = shift;
- lock(%$self);
- my $queue = $$self{'queue'};
- my $index = @_ ? $self->_validate_index(shift) : 0;
- my $count = @_ ? $self->_validate_count(shift) : 1;
- # Support negative indices
- if ($index < 0) {
- $index += @$queue;
- if ($index < 0) {
- $count += $index;
- return if ($count <= 0); # Beyond the head of the queue
- return $self->dequeue_nb($count); # Extract from the head
- }
- }
- # Dequeue items from $index+$count onward
- my @tmp;
- while (@$queue > ($index+$count)) {
- unshift(@tmp, pop(@$queue))
- }
- # Extract desired items
- my @items;
- unshift(@items, pop(@$queue)) while (@$queue > $index);
- # Add back any removed items
- push(@$queue, @tmp);
- # Return single item
- return $items[0] if ($count == 1);
- # Return multiple items
- return @items;
- }
- ### Internal Methods ###
- # Check value of the requested index
- sub _validate_index
- {
- my $self = shift;
- my $index = shift;
- if (! defined($index) ||
- ! looks_like_number($index) ||
- (int($index) != $index))
- {
- require Carp;
- my ($method) = (caller(1))[3];
- my $class_name = ref($self);
- $method =~ s/$class_name\:://;
- $index = 'undef' if (! defined($index));
- Carp::croak("Invalid 'index' argument ($index) to '$method' method");
- }
- return $index;
- };
- # Check value of the requested count
- sub _validate_count
- {
- my $self = shift;
- my $count = shift;
- if (! defined($count) ||
- ! looks_like_number($count) ||
- (int($count) != $count) ||
- ($count < 1))
- {
- require Carp;
- my ($method) = (caller(1))[3];
- my $class_name = ref($self);
- $method =~ s/$class_name\:://;
- $count = 'undef' if (! defined($count));
- Carp::croak("Invalid 'count' argument ($count) to '$method' method");
- }
- return $count;
- };
- # Check value of the requested timeout
- sub _validate_timeout
- {
- my $self = shift;
- my $timeout = shift;
- if (! defined($timeout) ||
- ! looks_like_number($timeout))
- {
- require Carp;
- my ($method) = (caller(1))[3];
- my $class_name = ref($self);
- $method =~ s/$class_name\:://;
- $timeout = 'undef' if (! defined($timeout));
- Carp::croak("Invalid 'timeout' argument ($timeout) to '$method' method");
- }
- return $timeout;
- };
- 1;
- =head1 NAME
- Thread::Queue - Thread-safe queues
- =head1 VERSION
- This document describes Thread::Queue version 3.05
- =head1 SYNOPSIS
- use strict;
- use warnings;
- use threads;
- use Thread::Queue;
- my $q = Thread::Queue->new(); # A new empty queue
- # Worker thread
- my $thr = threads->create(
- sub {
- # Thread will loop until no more work
- while (defined(my $item = $q->dequeue())) {
- # Do work on $item
- ...
- }
- }
- );
- # Send work to the thread
- $q->enqueue($item1, ...);
- # Signal that there is no more work to be sent
- $q->end();
- # Join up with the thread when it finishes
- $thr->join();
- ...
- # Count of items in the queue
- my $left = $q->pending();
- # Non-blocking dequeue
- if (defined(my $item = $q->dequeue_nb())) {
- # Work on $item
- }
- # Blocking dequeue with 5-second timeout
- if (defined(my $item = $q->dequeue_timed(5))) {
- # Work on $item
- }
- # Get the second item in the queue without dequeuing anything
- my $item = $q->peek(1);
- # Insert two items into the queue just behind the head
- $q->insert(1, $item1, $item2);
- # Extract the last two items on the queue
- my ($item1, $item2) = $q->extract(-2, 2);
- =head1 DESCRIPTION
- This module provides thread-safe FIFO queues that can be accessed safely by
- any number of threads.
- Any data types supported by L<threads::shared> can be passed via queues:
- =over
- =item Ordinary scalars
- =item Array refs
- =item Hash refs
- =item Scalar refs
- =item Objects based on the above
- =back
- Ordinary scalars are added to queues as they are.
- If not already thread-shared, the other complex data types will be cloned
- (recursively, if needed, and including any C<bless>ings and read-only
- settings) into thread-shared structures before being placed onto a queue.
- For example, the following would cause L<Thread::Queue> to create a empty,
- shared array reference via C<&shared([])>, copy the elements 'foo', 'bar'
- and 'baz' from C<@ary> into it, and then place that shared reference onto
- the queue:
- my @ary = qw/foo bar baz/;
- $q->enqueue(\@ary);
- However, for the following, the items are already shared, so their references
- are added directly to the queue, and no cloning takes place:
- my @ary :shared = qw/foo bar baz/;
- $q->enqueue(\@ary);
- my $obj = &shared({});
- $$obj{'foo'} = 'bar';
- $$obj{'qux'} = 99;
- bless($obj, 'My::Class');
- $q->enqueue($obj);
- See L</"LIMITATIONS"> for caveats related to passing objects via queues.
- =head1 QUEUE CREATION
- =over
- =item ->new()
- Creates a new empty queue.
- =item ->new(LIST)
- Creates a new queue pre-populated with the provided list of items.
- =back
- =head1 BASIC METHODS
- The following methods deal with queues on a FIFO basis.
- =over
- =item ->enqueue(LIST)
- Adds a list of items onto the end of the queue.
- =item ->dequeue()
- =item ->dequeue(COUNT)
- Removes the requested number of items (default is 1) from the head of the
- queue, and returns them. If the queue contains fewer than the requested
- number of items, then the thread will be blocked until the requisite number
- of items are available (i.e., until other threads <enqueue> more items).
- =item ->dequeue_nb()
- =item ->dequeue_nb(COUNT)
- Removes the requested number of items (default is 1) from the head of the
- queue, and returns them. If the queue contains fewer than the requested
- number of items, then it immediately (i.e., non-blocking) returns whatever
- items there are on the queue. If the queue is empty, then C<undef> is
- returned.
- =item ->dequeue_timed(TIMEOUT)
- =item ->dequeue_timed(TIMEOUT, COUNT)
- Removes the requested number of items (default is 1) from the head of the
- queue, and returns them. If the queue contains fewer than the requested
- number of items, then the thread will be blocked until the requisite number of
- items are available, or until the timeout is reached. If the timeout is
- reached, it returns whatever items there are on the queue, or C<undef> if the
- queue is empty.
- The timeout may be a number of seconds relative to the current time (e.g., 5
- seconds from when the call is made), or may be an absolute timeout in I<epoch>
- seconds the same as would be used with
- L<cond_timedwait()|threads::shared/"cond_timedwait VARIABLE, ABS_TIMEOUT">.
- Fractional seconds (e.g., 2.5 seconds) are also supported (to the extent of
- the underlying implementation).
- If C<TIMEOUT> is missing, C<undef>, or less than or equal to 0, then this call
- behaves the same as C<dequeue_nb>.
- =item ->pending()
- Returns the number of items still in the queue. Returns C<undef> if the queue
- has been ended (see below), and there are no more items in the queue.
- =item ->end()
- Declares that no more items will be added to the queue.
- All threads blocking on C<dequeue()> calls will be unblocked with any
- remaining items in the queue and/or C<undef> being returned. Any subsequent
- calls to C<dequeue()> will behave like C<dequeue_nb()>.
- Once ended, no more items may be placed in the queue.
- =back
- =head1 ADVANCED METHODS
- The following methods can be used to manipulate items anywhere in a queue.
- To prevent the contents of a queue from being modified by another thread
- while it is being examined and/or changed, L<lock|threads::shared/"lock
- VARIABLE"> the queue inside a local block:
- {
- lock($q); # Keep other threads from changing the queue's contents
- my $item = $q->peek();
- if ($item ...) {
- ...
- }
- }
- # Queue is now unlocked
- =over
- =item ->peek()
- =item ->peek(INDEX)
- Returns an item from the queue without dequeuing anything. Defaults to the
- the head of queue (at index position 0) if no index is specified. Negative
- index values are supported as with L<arrays|perldata/"Subscripts"> (i.e., -1
- is the end of the queue, -2 is next to last, and so on).
- If no items exists at the specified index (i.e., the queue is empty, or the
- index is beyond the number of items on the queue), then C<undef> is returned.
- Remember, the returned item is not removed from the queue, so manipulating a
- C<peek>ed at reference affects the item on the queue.
- =item ->insert(INDEX, LIST)
- Adds the list of items to the queue at the specified index position (0
- is the head of the list). Any existing items at and beyond that position are
- pushed back past the newly added items:
- $q->enqueue(1, 2, 3, 4);
- $q->insert(1, qw/foo bar/);
- # Queue now contains: 1, foo, bar, 2, 3, 4
- Specifying an index position greater than the number of items in the queue
- just adds the list to the end.
- Negative index positions are supported:
- $q->enqueue(1, 2, 3, 4);
- $q->insert(-2, qw/foo bar/);
- # Queue now contains: 1, 2, foo, bar, 3, 4
- Specifying a negative index position greater than the number of items in the
- queue adds the list to the head of the queue.
- =item ->extract()
- =item ->extract(INDEX)
- =item ->extract(INDEX, COUNT)
- Removes and returns the specified number of items (defaults to 1) from the
- specified index position in the queue (0 is the head of the queue). When
- called with no arguments, C<extract> operates the same as C<dequeue_nb>.
- This method is non-blocking, and will return only as many items as are
- available to fulfill the request:
- $q->enqueue(1, 2, 3, 4);
- my $item = $q->extract(2) # Returns 3
- # Queue now contains: 1, 2, 4
- my @items = $q->extract(1, 3) # Returns (2, 4)
- # Queue now contains: 1
- Specifying an index position greater than the number of items in the
- queue results in C<undef> or an empty list being returned.
- $q->enqueue('foo');
- my $nada = $q->extract(3) # Returns undef
- my @nada = $q->extract(1, 3) # Returns ()
- Negative index positions are supported. Specifying a negative index position
- greater than the number of items in the queue may return items from the head
- of the queue (similar to C<dequeue_nb>) if the count overlaps the head of the
- queue from the specified position (i.e. if queue size + index + count is
- greater than zero):
- $q->enqueue(qw/foo bar baz/);
- my @nada = $q->extract(-6, 2); # Returns () - (3+(-6)+2) <= 0
- my @some = $q->extract(-6, 4); # Returns (foo) - (3+(-6)+4) > 0
- # Queue now contains: bar, baz
- my @rest = $q->extract(-3, 4); # Returns (bar, baz) - (2+(-3)+4) > 0
- =back
- =head1 NOTES
- Queues created by L<Thread::Queue> can be used in both threaded and
- non-threaded applications.
- =head1 LIMITATIONS
- Passing objects on queues may not work if the objects' classes do not support
- sharing. See L<threads::shared/"BUGS AND LIMITATIONS"> for more.
- Passing array/hash refs that contain objects may not work for Perl prior to
- 5.10.0.
- =head1 SEE ALSO
- Thread::Queue Discussion Forum on CPAN:
- L<http://www.cpanforum.com/dist/Thread-Queue>
- L<threads>, L<threads::shared>
- Sample code in the I<examples> directory of this distribution on CPAN.
- =head1 MAINTAINER
- Jerry D. Hedden, S<E<lt>jdhedden AT cpan DOT orgE<gt>>
- =head1 LICENSE
- This program is free software; you can redistribute it and/or modify it under
- the same terms as Perl itself.
- =cut
|