using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
namespace SuperSocket.Common
{
///
/// SendingQueue
///
public sealed class SendingQueue : IList>
{
private readonly int m_Offset;
private readonly int m_Capacity;
private int m_CurrentCount = 0;
private ArraySegment[] m_GlobalQueue;
private static ArraySegment m_Null = default(ArraySegment);
private int m_UpdatingCount;
private bool m_ReadOnly = false;
private ushort m_TrackID = 1;
private int m_InnerOffset = 0;
///
/// Gets the track ID.
///
///
/// The track ID.
///
public ushort TrackID
{
get { return m_TrackID; }
}
///
/// Initializes a new instance of the class.
///
/// The global queue.
/// The offset.
/// The capacity.
public SendingQueue(ArraySegment[] globalQueue, int offset, int capacity)
{
m_GlobalQueue = globalQueue;
m_Offset = offset;
m_Capacity = capacity;
}
private bool TryEnqueue(ArraySegment item, out bool conflict, ushort trackID)
{
conflict = false;
var oldCount = m_CurrentCount;
if (oldCount >= Capacity)
return false;
if (m_ReadOnly)
return false;
if (trackID != m_TrackID)
return false;
int compareCount = Interlocked.CompareExchange(ref m_CurrentCount, oldCount + 1, oldCount);
//conflicts
if (compareCount != oldCount)
{
conflict = true;
return false;
}
m_GlobalQueue[m_Offset + oldCount] = item;
return true;
}
///
/// Enqueues the specified item.
///
/// The item.
/// The track ID.
///
public bool Enqueue(ArraySegment item, ushort trackID)
{
if (m_ReadOnly)
return false;
Interlocked.Increment(ref m_UpdatingCount);
while (!m_ReadOnly)
{
bool conflict = false;
if (TryEnqueue(item, out conflict, trackID))
{
Interlocked.Decrement(ref m_UpdatingCount);
return true;
}
//Needn't retry
if (!conflict)
break;
}
Interlocked.Decrement(ref m_UpdatingCount);
return false;
}
///
/// Enqueues the specified items.
///
/// The items.
/// The track ID.
///
public bool Enqueue(IList> items, ushort trackID)
{
if (m_ReadOnly)
return false;
Interlocked.Increment(ref m_UpdatingCount);
bool conflict;
while (!m_ReadOnly)
{
if (TryEnqueue(items, out conflict, trackID))
{
Interlocked.Decrement(ref m_UpdatingCount);
return true;
}
if (!conflict)
break;
}
Interlocked.Decrement(ref m_UpdatingCount);
return false;
}
private bool TryEnqueue(IList> items, out bool conflict, ushort trackID)
{
conflict = false;
var oldCount = m_CurrentCount;
int newItemCount = items.Count;
int expectedCount = oldCount + newItemCount;
if (expectedCount > Capacity)
return false;
if (m_ReadOnly)
return false;
if (m_TrackID != trackID)
return false;
int compareCount = Interlocked.CompareExchange(ref m_CurrentCount, expectedCount, oldCount);
if (compareCount != oldCount)
{
conflict = true;
return false;
}
var queue = m_GlobalQueue;
for (var i = 0; i < items.Count; i++)
{
queue[m_Offset + oldCount + i] = items[i];
}
return true;
}
///
/// Stops the enqueue, and then wait all current excueting enqueu threads exit.
///
public void StopEnqueue()
{
if (m_ReadOnly)
return;
m_ReadOnly = true;
if (m_UpdatingCount <= 0)
return;
var spinWait = new SpinWait();
spinWait.SpinOnce();
//Wait until all insertings are finished
while (m_UpdatingCount > 0)
{
spinWait.SpinOnce();
}
}
///
/// Starts to allow enqueue.
///
public void StartEnqueue()
{
m_ReadOnly = false;
}
///
/// Gets the global queue.
///
///
/// The global queue.
///
public ArraySegment[] GlobalQueue
{
get { return m_GlobalQueue; }
}
///
/// Gets the offset.
///
///
/// The offset.
///
public int Offset
{
get { return m_Offset; }
}
///
/// Gets the capacity.
///
///
/// The capacity.
///
public int Capacity
{
get { return m_Capacity; }
}
///
/// Gets the number of elements contained in the .
///
/// The number of elements contained in the .
public int Count
{
get { return m_CurrentCount - m_InnerOffset; }
}
///
/// Gets or sets the position.
///
///
/// The position.
///
public int Position { get; set; }
///
/// Determines the index of a specific item in the .
///
/// The object to locate in the .
///
/// The index of if found in the list; otherwise, -1.
///
///
public int IndexOf(ArraySegment item)
{
throw new NotSupportedException();
}
///
/// Inserts an item to the at the specified index.
///
/// The zero-based index at which should be inserted.
/// The object to insert into the .
///
public void Insert(int index, ArraySegment item)
{
throw new NotSupportedException();
}
///
/// Removes the item at the specified index.
///
/// The zero-based index of the item to remove.
///
public void RemoveAt(int index)
{
throw new NotSupportedException();
}
///
/// Gets or sets the element at the specified index.
///
/// The index.
///
///
public ArraySegment this[int index]
{
get
{
var targetIndex = m_Offset + m_InnerOffset + index;
var value = m_GlobalQueue[targetIndex];
if (value.Array != null)
return value;
var spinWait = new SpinWait();
while (true)
{
spinWait.SpinOnce();
value = m_GlobalQueue[targetIndex];
if (value.Array != null)
return value;
if (spinWait.Count > 50)
return value;
}
}
set
{
throw new NotSupportedException();
}
}
///
/// Adds an item to the .
///
/// The object to add to the .
///
public void Add(ArraySegment item)
{
throw new NotSupportedException();
}
///
/// Removes all items from the .
///
///
public void Clear()
{
if (m_TrackID >= ushort.MaxValue)
m_TrackID = 1;
else
m_TrackID++;
for (var i = 0; i < m_CurrentCount; i++)
{
m_GlobalQueue[m_Offset + i] = m_Null;
}
m_CurrentCount = 0;
m_InnerOffset = 0;
Position = 0;
}
///
/// Determines whether the contains a specific value.
///
/// The object to locate in the .
///
/// true if is found in the ; otherwise, false.
///
///
public bool Contains(ArraySegment item)
{
throw new NotSupportedException();
}
///
/// Copies to.
///
/// The array.
/// Index of the array.
public void CopyTo(ArraySegment[] array, int arrayIndex)
{
for (var i = 0; i < Count; i++)
{
array[arrayIndex + i] = this[i];
}
}
///
/// Gets a value indicating whether the is read-only.
///
/// true if the is read-only; otherwise, false.
public bool IsReadOnly
{
get { return m_ReadOnly; }
}
///
/// Removes the first occurrence of a specific object from the .
///
/// The object to remove from the .
///
/// true if was successfully removed from the ; otherwise, false. This method also returns false if is not found in the original .
///
///
public bool Remove(ArraySegment item)
{
throw new NotSupportedException();
}
///
/// Returns an enumerator that iterates through the collection.
///
///
/// A that can be used to iterate through the collection.
///
///
public IEnumerator> GetEnumerator()
{
for (var i = 0; i < (m_CurrentCount - m_InnerOffset); i++)
{
yield return m_GlobalQueue[m_Offset + m_InnerOffset + i];
}
}
///
/// Returns an enumerator that iterates through a collection.
///
///
/// An object that can be used to iterate through the collection.
///
///
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
///
/// Trim the internal segments at the begining by the binary data size.
///
/// The binary data size should be trimed at the begining.
public void InternalTrim(int offset)
{
var innerCount = m_CurrentCount - m_InnerOffset;
var subTotal = 0;
for (var i = m_InnerOffset; i < innerCount; i++)
{
var segment = m_GlobalQueue[m_Offset + i];
subTotal += segment.Count;
if (subTotal <= offset)
continue;
m_InnerOffset = i;
var rest = subTotal - offset;
m_GlobalQueue[m_Offset + i] = new ArraySegment(segment.Array, segment.Offset + segment.Count - rest, rest);
break;
}
}
}
///
/// SendingQueueSourceCreator
///
public class SendingQueueSourceCreator : ISmartPoolSourceCreator
{
private int m_SendingQueueSize;
///
/// Initializes a new instance of the class.
///
/// Size of the sending queue.
public SendingQueueSourceCreator(int sendingQueueSize)
{
m_SendingQueueSize = sendingQueueSize;
}
///
/// Creates the specified size.
///
/// The size.
/// The pool items.
///
public ISmartPoolSource Create(int size, out SendingQueue[] poolItems)
{
var source = new ArraySegment[size * m_SendingQueueSize];
poolItems = new SendingQueue[size];
for (var i = 0; i < size; i++)
{
poolItems[i] = new SendingQueue(source, i * m_SendingQueueSize, m_SendingQueueSize);
}
return new SmartPoolSource(source, size);
}
}
}