using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; namespace SuperSocket.Common { /// /// SendingQueue /// public sealed class SendingQueue : IList> { private readonly int m_Offset; private readonly int m_Capacity; private int m_CurrentCount = 0; private ArraySegment[] m_GlobalQueue; private static ArraySegment m_Null = default(ArraySegment); private int m_UpdatingCount; private bool m_ReadOnly = false; private ushort m_TrackID = 1; private int m_InnerOffset = 0; /// /// Gets the track ID. /// /// /// The track ID. /// public ushort TrackID { get { return m_TrackID; } } /// /// Initializes a new instance of the class. /// /// The global queue. /// The offset. /// The capacity. public SendingQueue(ArraySegment[] globalQueue, int offset, int capacity) { m_GlobalQueue = globalQueue; m_Offset = offset; m_Capacity = capacity; } private bool TryEnqueue(ArraySegment item, out bool conflict, ushort trackID) { conflict = false; var oldCount = m_CurrentCount; if (oldCount >= Capacity) return false; if (m_ReadOnly) return false; if (trackID != m_TrackID) return false; int compareCount = Interlocked.CompareExchange(ref m_CurrentCount, oldCount + 1, oldCount); //conflicts if (compareCount != oldCount) { conflict = true; return false; } m_GlobalQueue[m_Offset + oldCount] = item; return true; } /// /// Enqueues the specified item. /// /// The item. /// The track ID. /// public bool Enqueue(ArraySegment item, ushort trackID) { if (m_ReadOnly) return false; Interlocked.Increment(ref m_UpdatingCount); while (!m_ReadOnly) { bool conflict = false; if (TryEnqueue(item, out conflict, trackID)) { Interlocked.Decrement(ref m_UpdatingCount); return true; } //Needn't retry if (!conflict) break; } Interlocked.Decrement(ref m_UpdatingCount); return false; } /// /// Enqueues the specified items. /// /// The items. /// The track ID. /// public bool Enqueue(IList> items, ushort trackID) { if (m_ReadOnly) return false; Interlocked.Increment(ref m_UpdatingCount); bool conflict; while (!m_ReadOnly) { if (TryEnqueue(items, out conflict, trackID)) { Interlocked.Decrement(ref m_UpdatingCount); return true; } if (!conflict) break; } Interlocked.Decrement(ref m_UpdatingCount); return false; } private bool TryEnqueue(IList> items, out bool conflict, ushort trackID) { conflict = false; var oldCount = m_CurrentCount; int newItemCount = items.Count; int expectedCount = oldCount + newItemCount; if (expectedCount > Capacity) return false; if (m_ReadOnly) return false; if (m_TrackID != trackID) return false; int compareCount = Interlocked.CompareExchange(ref m_CurrentCount, expectedCount, oldCount); if (compareCount != oldCount) { conflict = true; return false; } var queue = m_GlobalQueue; for (var i = 0; i < items.Count; i++) { queue[m_Offset + oldCount + i] = items[i]; } return true; } /// /// Stops the enqueue, and then wait all current excueting enqueu threads exit. /// public void StopEnqueue() { if (m_ReadOnly) return; m_ReadOnly = true; if (m_UpdatingCount <= 0) return; var spinWait = new SpinWait(); spinWait.SpinOnce(); //Wait until all insertings are finished while (m_UpdatingCount > 0) { spinWait.SpinOnce(); } } /// /// Starts to allow enqueue. /// public void StartEnqueue() { m_ReadOnly = false; } /// /// Gets the global queue. /// /// /// The global queue. /// public ArraySegment[] GlobalQueue { get { return m_GlobalQueue; } } /// /// Gets the offset. /// /// /// The offset. /// public int Offset { get { return m_Offset; } } /// /// Gets the capacity. /// /// /// The capacity. /// public int Capacity { get { return m_Capacity; } } /// /// Gets the number of elements contained in the . /// /// The number of elements contained in the . public int Count { get { return m_CurrentCount - m_InnerOffset; } } /// /// Gets or sets the position. /// /// /// The position. /// public int Position { get; set; } /// /// Determines the index of a specific item in the . /// /// The object to locate in the . /// /// The index of if found in the list; otherwise, -1. /// /// public int IndexOf(ArraySegment item) { throw new NotSupportedException(); } /// /// Inserts an item to the at the specified index. /// /// The zero-based index at which should be inserted. /// The object to insert into the . /// public void Insert(int index, ArraySegment item) { throw new NotSupportedException(); } /// /// Removes the item at the specified index. /// /// The zero-based index of the item to remove. /// public void RemoveAt(int index) { throw new NotSupportedException(); } /// /// Gets or sets the element at the specified index. /// /// The index. /// /// public ArraySegment this[int index] { get { var targetIndex = m_Offset + m_InnerOffset + index; var value = m_GlobalQueue[targetIndex]; if (value.Array != null) return value; var spinWait = new SpinWait(); while (true) { spinWait.SpinOnce(); value = m_GlobalQueue[targetIndex]; if (value.Array != null) return value; if (spinWait.Count > 50) return value; } } set { throw new NotSupportedException(); } } /// /// Adds an item to the . /// /// The object to add to the . /// public void Add(ArraySegment item) { throw new NotSupportedException(); } /// /// Removes all items from the . /// /// public void Clear() { if (m_TrackID >= ushort.MaxValue) m_TrackID = 1; else m_TrackID++; for (var i = 0; i < m_CurrentCount; i++) { m_GlobalQueue[m_Offset + i] = m_Null; } m_CurrentCount = 0; m_InnerOffset = 0; Position = 0; } /// /// Determines whether the contains a specific value. /// /// The object to locate in the . /// /// true if is found in the ; otherwise, false. /// /// public bool Contains(ArraySegment item) { throw new NotSupportedException(); } /// /// Copies to. /// /// The array. /// Index of the array. public void CopyTo(ArraySegment[] array, int arrayIndex) { for (var i = 0; i < Count; i++) { array[arrayIndex + i] = this[i]; } } /// /// Gets a value indicating whether the is read-only. /// /// true if the is read-only; otherwise, false. public bool IsReadOnly { get { return m_ReadOnly; } } /// /// Removes the first occurrence of a specific object from the . /// /// The object to remove from the . /// /// true if was successfully removed from the ; otherwise, false. This method also returns false if is not found in the original . /// /// public bool Remove(ArraySegment item) { throw new NotSupportedException(); } /// /// Returns an enumerator that iterates through the collection. /// /// /// A that can be used to iterate through the collection. /// /// public IEnumerator> GetEnumerator() { for (var i = 0; i < (m_CurrentCount - m_InnerOffset); i++) { yield return m_GlobalQueue[m_Offset + m_InnerOffset + i]; } } /// /// Returns an enumerator that iterates through a collection. /// /// /// An object that can be used to iterate through the collection. /// /// System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() { return GetEnumerator(); } /// /// Trim the internal segments at the begining by the binary data size. /// /// The binary data size should be trimed at the begining. public void InternalTrim(int offset) { var innerCount = m_CurrentCount - m_InnerOffset; var subTotal = 0; for (var i = m_InnerOffset; i < innerCount; i++) { var segment = m_GlobalQueue[m_Offset + i]; subTotal += segment.Count; if (subTotal <= offset) continue; m_InnerOffset = i; var rest = subTotal - offset; m_GlobalQueue[m_Offset + i] = new ArraySegment(segment.Array, segment.Offset + segment.Count - rest, rest); break; } } } /// /// SendingQueueSourceCreator /// public class SendingQueueSourceCreator : ISmartPoolSourceCreator { private int m_SendingQueueSize; /// /// Initializes a new instance of the class. /// /// Size of the sending queue. public SendingQueueSourceCreator(int sendingQueueSize) { m_SendingQueueSize = sendingQueueSize; } /// /// Creates the specified size. /// /// The size. /// The pool items. /// public ISmartPoolSource Create(int size, out SendingQueue[] poolItems) { var source = new ArraySegment[size * m_SendingQueueSize]; poolItems = new SendingQueue[size]; for (var i = 0; i < size; i++) { poolItems[i] = new SendingQueue(source, i * m_SendingQueueSize, m_SendingQueueSize); } return new SmartPoolSource(source, size); } } }