"""
Implement the base channel construct that is used by rabbitpy objects
such as :py:class:`Exchange <rabbitpy.Exchange>` or
:py:class:`Exchange <rabbitpy.Queue>`. It is responsible for coordinating
the communication between the IO thread and the higher-level objects.
"""
import logging
from pamqp import specification as spec
from pamqp import PYTHON3
from rabbitpy import base
from rabbitpy import exceptions
from rabbitpy import message
from rabbitpy.utils import queue
LOGGER = logging.getLogger(__name__)
BASIC_DELIVER = 'Basic.Deliver'
CONTENT_BODY = 'ContentBody'
CONTENT_HEADER = 'ContentHeader'
[docs]class Channel(base.AMQPChannel):
"""The Channel object is the communications object used by Exchanges,
Messages, Queues, and Transactions. It is created by invoking the
:py:meth:`rabbitpy.Connection.channel()
<rabbitpy.connection.Connection.channel>` method. It can act as a context
manager, allowing for quick shorthand use:
.. code:: python
with connection.channel():
# Do something
To create a new channel, invoke
py:meth:`~rabbitpy.connection.Connection.channel()`
To improve performance, pass blocking_read to True. Note that doing
so prevents ``KeyboardInterrupt``/CTRL-C from exiting the Python
interpreter.
:param int channel_id: The channel # to use for this instance
:param dict server_capabilities: Features the server supports
:param events: Event management object
:type events: rabbitpy.Events
:param exception_queue: Exception queue
:type exception_queue: queue.Queue
:param read_queue: Queue to read pending frames from
:type read_queue: queue.Queue
:param write_queue: Queue to write pending AMQP objs to
:type write_queue: queue.Queue
:param int maximum_frame_size: The max frame size for msg bodies
:param socket write_trigger: Write to this socket to break IO waiting
:param bool blocking_read: Use blocking Queue.get to improve performance
:raises: rabbitpy.exceptions.RemoteClosedChannelException
:raises: rabbitpy.exceptions.AMQPException
"""
STATES = base.AMQPChannel.STATES
STATES[0x04] = 'Remotely Closed'
def __init__(self, channel_id, server_capabilities, events,
exception_queue, read_queue, write_queue,
maximum_frame_size, write_trigger, connection,
blocking_read=False):
"""Create a new instance of the Channel class"""
super(Channel, self).__init__(exception_queue, write_trigger,
connection, blocking_read)
self._channel_id = channel_id
self._consumers = {}
self._consuming = False
self._events = events
self._maximum_frame_size = maximum_frame_size
self._publisher_confirms = False
self._read_queue = read_queue
self._write_queue = write_queue
self._server_capabilities = server_capabilities
def __enter__(self):
"""For use as a context manager, return a handle to this object
instance.
:rtype: Channel
"""
return self
def __exit__(self, exc_type, exc_val, unused_exc_tb):
"""When leaving the context, examine why the context is leaving, if
it's an exception or what.
"""
if exc_type and exc_val:
LOGGER.debug('Exiting due to exception: %r', exc_val)
self._set_state(self.CLOSED)
raise exc_val
if self.open:
self.close()
[docs] def close(self):
"""Close the channel, cancelling any active consumers, purging the read
queue, while looking to see if a Basic.Nack should be sent, sending it
if so.
"""
if self._connection.closed:
LOGGER.debug('Channel %i close invoked when connection closed',
self._channel_id)
return
elif self.closed:
LOGGER.debug('Channel %i close invoked when already closed',
self._channel_id)
return
self._set_state(self.CLOSING)
# Empty the queue and nack the max id (and all previous)
if self._consumers:
delivery_tag = 0
discard_counter = 0
ack_tags = []
for queue_obj, no_ack in self._consumers.values():
self._cancel_consumer(queue_obj)
if not no_ack:
LOGGER.debug('Channel %i will nack messages for %s',
self._channel_id, queue_obj.consumer_tag)
ack_tags.append(queue_obj.consumer_tag)
# If there are any ack tags, get the last msg to nack
if ack_tags:
while not self._read_queue.empty():
frame_value = self._get_from_read_queue()
if not frame_value:
break
if (frame_value.name == BASIC_DELIVER and
frame_value.consumer_tag in ack_tags):
if delivery_tag < frame_value.delivery_tag:
delivery_tag = frame_value.delivery_tag
discard_counter += 1
if delivery_tag:
self._multi_nack(delivery_tag)
super(Channel, self).close()
[docs] def enable_publisher_confirms(self):
"""Turn on Publisher Confirms. If confirms are turned on, the
Message.publish command will return a bool indicating if a message has
been successfully published.
"""
if not self._supports_publisher_confirms:
raise exceptions.NotSupportedError('Confirm.Select')
self.rpc(spec.Confirm.Select())
self._publisher_confirms = True
@property
def id(self): # pylint: disable=invalid-name
"""Return the channel id
:rtype: int
"""
return self._channel_id
@property
def maximum_frame_size(self):
"""Return the AMQP maximum frame size
:rtype: int
"""
return self._maximum_frame_size
[docs] def open(self):
"""Open the channel, invoked directly upon creation by the Connection
"""
self._set_state(self.OPENING)
self.write_frame(self._build_open_frame())
self._wait_on_frame(spec.Channel.OpenOk)
self._set_state(self.OPEN)
LOGGER.debug('Channel #%i open', self._channel_id)
[docs] def prefetch_count(self, value, all_channels=False):
"""Set a prefetch count for the channel (or all channels on the same
connection).
:param int value: The prefetch count to set
:param bool all_channels: Set the prefetch count on all channels on the
same connection
"""
self.rpc(spec.Basic.Qos(prefetch_count=value, global_=all_channels))
[docs] def prefetch_size(self, value, all_channels=False):
"""Set a prefetch size in bytes for the channel (or all channels on the
same connection).
:param int value: The prefetch size to set
:param bool all_channels: Set the prefetch size on all channels on the
same connection
"""
if value is None:
return
self.rpc(spec.Basic.Qos(prefetch_size=value, global_=all_channels))
@property
def publisher_confirms(self):
"""Returns True if publisher confirms are enabled.
:rtype: bool
"""
return self._publisher_confirms
[docs] def recover(self, requeue=False):
"""Recover all unacknowledged messages that are associated with this
channel.
:param bool requeue: Requeue the message
"""
self.rpc(spec.Basic.Recover(requeue=requeue))
@staticmethod
def _build_open_frame():
"""Build and return a channel open frame
:rtype: pamqp.spec.Channel.Open
"""
return spec.Channel.Open()
def _cancel_consumer(self, obj, consumer_tag=None, nowait=False):
"""Cancel the consuming of a queue.
:param rabbitpy.amqp_queue.Queue obj: The queue to cancel
"""
consumer_tag = consumer_tag or obj.consumer_tag
self._interrupt_wait_on_frame(self._on_ready_to_cancel,
consumer_tag, nowait)
def _on_ready_to_cancel(self, consumer_tag, nowait):
self._check_for_exceptions()
if self.closed:
return
LOGGER.debug('Cancelling consumer while %r (%r)',
self.state_description,
self._connection.state_description)
if consumer_tag in self._consumers:
del self._consumers[consumer_tag]
if nowait:
self.write_frame(spec.Basic.Cancel(consumer_tag=consumer_tag,
nowait=True))
return
self.rpc(spec.Basic.Cancel(consumer_tag=consumer_tag))
def _check_for_rpc_request(self, value):
"""Inspect a frame to see if it's a RPC request from RabbitMQ.
:param spec.Frame value:
"""
LOGGER.debug('Checking for RPC request: %r', value)
super(Channel, self)._check_for_rpc_request(value)
if isinstance(value, spec.Basic.Return):
raise exceptions.MessageReturnedException(value.reply_code,
value.reply_text,
value.exchange,
value.routing_key)
elif isinstance(value, spec.Basic.Cancel):
self._waiting = False
if value.consumer_tag in self._consumers:
del self._consumers[value.consumer_tag]
raise exceptions.RemoteCancellationException(value.consumer_tag)
def _consume(self, obj, no_ack, priority=None):
"""Register a Queue object as a consumer, issuing Basic.Consume.
:param obj: The queue to consume
:type obj: rabbitpy.amqp_queue.Queue
:param bool no_ack: no_ack mode
:param int priority: Consumer priority
:raises: ValueError
"""
args = dict()
if priority is not None:
if not self._supports_consumer_priorities:
raise exceptions.NotSupportedError('consumer_priorities')
if not isinstance(priority, int):
raise ValueError('Consumer priority must be an int')
args['x-priority'] = priority
self.rpc(spec.Basic.Consume(queue=obj.name,
consumer_tag=obj.consumer_tag,
no_ack=no_ack,
arguments=args))
self._consumers[obj.consumer_tag] = (obj, no_ack)
def _consume_message(self):
"""Get a message from the stack, blocking while doing so. If a consumer
is cancelled out-of-band, we will receive a Basic.CancelOk
instead.
:rtype: rabbitpy.message.Message
"""
if not self._consumers:
raise exceptions.NotConsumingError
frame_value = self._wait_on_frame([spec.Basic.Deliver])
LOGGER.debug('Waited on frame, got %r', frame_value)
if frame_value:
return self._wait_for_content_frames(frame_value)
return None
def _create_message(self, method_frame, header_frame, body):
"""Create a message instance with the channel it was received on and
the dictionary of message parts. Will return None if no message can be
created.
:param method_frame: The method frame value
:type method_frame: pamqp.specification.Frame
:param header_frame: Header frame value
:type header_frame: pamqp.header.ContentHeader or None
:param body: The message body
:type body: str or None
:rtype: rabbitpy.message.Message or None
"""
if not method_frame:
LOGGER.warning('Received empty method_frame, returning None')
return None
if not header_frame:
LOGGER.debug('Malformed header frame: %r', header_frame)
props = header_frame.properties.to_dict() if header_frame else dict()
msg = message.Message(self, body, props)
msg.method = method_frame
msg.name = method_frame.name
return msg
def _get_from_read_queue(self):
"""Fetch a frame from the read queue and return it, otherwise return
None
:rtype: pamqp.specification.Frame
"""
try:
frame_value = self._read_queue.get(False)
except queue.Empty:
return None
try:
self._read_queue.task_done()
except ValueError:
pass
return frame_value
def _get_message(self):
"""Try and get a delivered message from the connection's message stack.
:rtype: rabbitpy.message.Message or None
"""
frame_value = self._wait_on_frame([spec.Basic.GetOk,
spec.Basic.GetEmpty])
if isinstance(frame_value, spec.Basic.GetEmpty):
return None
return self._wait_for_content_frames(frame_value)
def _multi_nack(self, delivery_tag, requeue=True):
"""Send a multiple negative acknowledgement, re-queueing the items
:param int delivery_tag: The delivery tag for this channel
:param bool requeue: Requeue the messages
"""
if not self._supports_basic_nack:
raise exceptions.NotSupportedError('Basic.Nack')
if self._is_debugging:
LOGGER.debug('Sending Basic.Nack with requeue')
self.rpc(spec.Basic.Nack(delivery_tag=delivery_tag,
multiple=True,
requeue=requeue))
def _reject_inbound_message(self, method_frame):
"""Used internally to reject a message when it's been received during
a state that it should not have been.
:param pamqp.specification.Basic.Deliver method_frame: The method frame
"""
self.rpc(spec.Basic.Reject(delivery_tag=method_frame.delivery_tag,
requeue=True))
@property
def _supports_basic_nack(self):
"""Indicates if the server supports Basic.Nack
:rtype: bool
"""
return self._server_capabilities.get('basic.nack', False)
@property
def _supports_consumer_cancel_notify(self): # pylint: disable=invalid-name
"""Indicates if the server supports sending consumer cancellation
notifications
:rtype: bool
"""
return self._server_capabilities.get('consumer_cancel_notify', False)
@property
def _supports_consumer_priorities(self):
"""Indicates if the server supports consumer priorities
:rtype: bool
"""
return self._server_capabilities.get('consumer_priorities', False)
@property
def _supports_per_consumer_qos(self):
"""Indicates if the server supports per consumer qos
:rtype: bool
"""
return self._server_capabilities.get('per_consumer_qos', False)
@property
def _supports_publisher_confirms(self):
"""Indicates if the server supports publisher confirmations
:rtype: bool
"""
return self._server_capabilities.get('publisher_confirms', False)
def _wait_for_content_frames(self, method_frame):
"""Used by both Channel._get_message and Channel._consume_message for
getting a message parts off the queue and returning the fully
constructed message.
:param method_frame: The method frame for the message
:type method_frame: Basic.Deliver or Basic.Get or Basic.Return
:rtype: rabbitpy.Message
"""
if self.closing or self.closed:
return None
consuming = isinstance(method_frame, spec.Basic.Deliver)
if consuming and not self._consumers:
return None
if self._is_debugging:
LOGGER.debug('Waiting on content frames for %s: %r',
method_frame.name, method_frame.delivery_tag)
header_value = self._wait_on_frame(CONTENT_HEADER)
if not header_value:
self._reject_inbound_message(method_frame)
return None
self._check_for_rpc_request(header_value)
if self._interrupt_is_set:
return self._on_interrupt_set()
error = False
# To retrieve the message body we must concatenate the binary content
# of several frames. The recommended idiom for this differs
# in py3 and py2.
if PYTHON3:
body_value = bytearray()
else:
body_chunks = []
body_length_received = 0
body_total_size = header_value.body_size
while body_length_received < body_total_size:
body_part = self._wait_on_frame(CONTENT_BODY)
self._check_for_rpc_request(body_part)
if self._interrupt_is_set:
self._on_interrupt_set()
error = True
elif not body_part:
break
elif self.closing or self.closed:
error = True
elif consuming and not self._consumers:
self._reject_inbound_message(method_frame)
error = True
if error:
return
body_length_received += len(body_part.value)
if PYTHON3:
body_value += body_part.value
else:
body_chunks.append(body_part.value)
if not PYTHON3:
body_value = ''.join(body_chunks)
return self._create_message(method_frame, header_value, body_value)