SendingQueue.cs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq;
  4. using System.Text;
  5. using System.Threading;
  6. namespace SuperSocket.Common
  7. {
  8. /// <summary>
  9. /// SendingQueue
  10. /// </summary>
  11. public sealed class SendingQueue : IList<ArraySegment<byte>>
  12. {
  13. private readonly int m_Offset;
  14. private readonly int m_Capacity;
  15. private int m_CurrentCount = 0;
  16. private ArraySegment<byte>[] m_GlobalQueue;
  17. private static ArraySegment<byte> m_Null = default(ArraySegment<byte>);
  18. private int m_UpdatingCount;
  19. private bool m_ReadOnly = false;
  20. private ushort m_TrackID = 1;
  21. private int m_InnerOffset = 0;
  22. /// <summary>
  23. /// Gets the track ID.
  24. /// </summary>
  25. /// <value>
  26. /// The track ID.
  27. /// </value>
  28. public ushort TrackID
  29. {
  30. get { return m_TrackID; }
  31. }
  32. /// <summary>
  33. /// Initializes a new instance of the <see cref="SendingQueue" /> class.
  34. /// </summary>
  35. /// <param name="globalQueue">The global queue.</param>
  36. /// <param name="offset">The offset.</param>
  37. /// <param name="capacity">The capacity.</param>
  38. public SendingQueue(ArraySegment<byte>[] globalQueue, int offset, int capacity)
  39. {
  40. m_GlobalQueue = globalQueue;
  41. m_Offset = offset;
  42. m_Capacity = capacity;
  43. }
  44. private bool TryEnqueue(ArraySegment<byte> item, out bool conflict, ushort trackID)
  45. {
  46. conflict = false;
  47. var oldCount = m_CurrentCount;
  48. if (oldCount >= Capacity)
  49. return false;
  50. if (m_ReadOnly)
  51. return false;
  52. if (trackID != m_TrackID)
  53. return false;
  54. int compareCount = Interlocked.CompareExchange(ref m_CurrentCount, oldCount + 1, oldCount);
  55. //conflicts
  56. if (compareCount != oldCount)
  57. {
  58. conflict = true;
  59. return false;
  60. }
  61. m_GlobalQueue[m_Offset + oldCount] = item;
  62. return true;
  63. }
  64. /// <summary>
  65. /// Enqueues the specified item.
  66. /// </summary>
  67. /// <param name="item">The item.</param>
  68. /// <param name="trackID">The track ID.</param>
  69. /// <returns></returns>
  70. public bool Enqueue(ArraySegment<byte> item, ushort trackID)
  71. {
  72. if (m_ReadOnly)
  73. return false;
  74. Interlocked.Increment(ref m_UpdatingCount);
  75. while (!m_ReadOnly)
  76. {
  77. bool conflict = false;
  78. if (TryEnqueue(item, out conflict, trackID))
  79. {
  80. Interlocked.Decrement(ref m_UpdatingCount);
  81. return true;
  82. }
  83. //Needn't retry
  84. if (!conflict)
  85. break;
  86. }
  87. Interlocked.Decrement(ref m_UpdatingCount);
  88. return false;
  89. }
  90. /// <summary>
  91. /// Enqueues the specified items.
  92. /// </summary>
  93. /// <param name="items">The items.</param>
  94. /// <param name="trackID">The track ID.</param>
  95. /// <returns></returns>
  96. public bool Enqueue(IList<ArraySegment<byte>> items, ushort trackID)
  97. {
  98. if (m_ReadOnly)
  99. return false;
  100. Interlocked.Increment(ref m_UpdatingCount);
  101. bool conflict;
  102. while (!m_ReadOnly)
  103. {
  104. if (TryEnqueue(items, out conflict, trackID))
  105. {
  106. Interlocked.Decrement(ref m_UpdatingCount);
  107. return true;
  108. }
  109. if (!conflict)
  110. break;
  111. }
  112. Interlocked.Decrement(ref m_UpdatingCount);
  113. return false;
  114. }
  115. private bool TryEnqueue(IList<ArraySegment<byte>> items, out bool conflict, ushort trackID)
  116. {
  117. conflict = false;
  118. var oldCount = m_CurrentCount;
  119. int newItemCount = items.Count;
  120. int expectedCount = oldCount + newItemCount;
  121. if (expectedCount > Capacity)
  122. return false;
  123. if (m_ReadOnly)
  124. return false;
  125. if (m_TrackID != trackID)
  126. return false;
  127. int compareCount = Interlocked.CompareExchange(ref m_CurrentCount, expectedCount, oldCount);
  128. if (compareCount != oldCount)
  129. {
  130. conflict = true;
  131. return false;
  132. }
  133. var queue = m_GlobalQueue;
  134. for (var i = 0; i < items.Count; i++)
  135. {
  136. queue[m_Offset + oldCount + i] = items[i];
  137. }
  138. return true;
  139. }
  140. /// <summary>
  141. /// Stops the enqueue, and then wait all current excueting enqueu threads exit.
  142. /// </summary>
  143. public void StopEnqueue()
  144. {
  145. if (m_ReadOnly)
  146. return;
  147. m_ReadOnly = true;
  148. if (m_UpdatingCount <= 0)
  149. return;
  150. var spinWait = new SpinWait();
  151. spinWait.SpinOnce();
  152. //Wait until all insertings are finished
  153. while (m_UpdatingCount > 0)
  154. {
  155. spinWait.SpinOnce();
  156. }
  157. }
  158. /// <summary>
  159. /// Starts to allow enqueue.
  160. /// </summary>
  161. public void StartEnqueue()
  162. {
  163. m_ReadOnly = false;
  164. }
  165. /// <summary>
  166. /// Gets the global queue.
  167. /// </summary>
  168. /// <value>
  169. /// The global queue.
  170. /// </value>
  171. public ArraySegment<byte>[] GlobalQueue
  172. {
  173. get { return m_GlobalQueue; }
  174. }
  175. /// <summary>
  176. /// Gets the offset.
  177. /// </summary>
  178. /// <value>
  179. /// The offset.
  180. /// </value>
  181. public int Offset
  182. {
  183. get { return m_Offset; }
  184. }
  185. /// <summary>
  186. /// Gets the capacity.
  187. /// </summary>
  188. /// <value>
  189. /// The capacity.
  190. /// </value>
  191. public int Capacity
  192. {
  193. get { return m_Capacity; }
  194. }
  195. /// <summary>
  196. /// Gets the number of elements contained in the <see cref="T:System.Collections.Generic.ICollection`1" />.
  197. /// </summary>
  198. /// <returns>The number of elements contained in the <see cref="T:System.Collections.Generic.ICollection`1" />.</returns>
  199. public int Count
  200. {
  201. get { return m_CurrentCount - m_InnerOffset; }
  202. }
  203. /// <summary>
  204. /// Gets or sets the position.
  205. /// </summary>
  206. /// <value>
  207. /// The position.
  208. /// </value>
  209. public int Position { get; set; }
  210. /// <summary>
  211. /// Determines the index of a specific item in the <see cref="T:System.Collections.Generic.IList`1" />.
  212. /// </summary>
  213. /// <param name="item">The object to locate in the <see cref="T:System.Collections.Generic.IList`1" />.</param>
  214. /// <returns>
  215. /// The index of <paramref name="item" /> if found in the list; otherwise, -1.
  216. /// </returns>
  217. /// <exception cref="System.NotSupportedException"></exception>
  218. public int IndexOf(ArraySegment<byte> item)
  219. {
  220. throw new NotSupportedException();
  221. }
  222. /// <summary>
  223. /// Inserts an item to the <see cref="T:System.Collections.Generic.IList`1" /> at the specified index.
  224. /// </summary>
  225. /// <param name="index">The zero-based index at which <paramref name="item" /> should be inserted.</param>
  226. /// <param name="item">The object to insert into the <see cref="T:System.Collections.Generic.IList`1" />.</param>
  227. /// <exception cref="System.NotSupportedException"></exception>
  228. public void Insert(int index, ArraySegment<byte> item)
  229. {
  230. throw new NotSupportedException();
  231. }
  232. /// <summary>
  233. /// Removes the <see cref="T:System.Collections.Generic.IList`1" /> item at the specified index.
  234. /// </summary>
  235. /// <param name="index">The zero-based index of the item to remove.</param>
  236. /// <exception cref="System.NotSupportedException"></exception>
  237. public void RemoveAt(int index)
  238. {
  239. throw new NotSupportedException();
  240. }
  241. /// <summary>
  242. /// Gets or sets the element at the specified index.
  243. /// </summary>
  244. /// <param name="index">The index.</param>
  245. /// <returns></returns>
  246. /// <exception cref="System.NotSupportedException"></exception>
  247. public ArraySegment<byte> this[int index]
  248. {
  249. get
  250. {
  251. var targetIndex = m_Offset + m_InnerOffset + index;
  252. var value = m_GlobalQueue[targetIndex];
  253. if (value.Array != null)
  254. return value;
  255. var spinWait = new SpinWait();
  256. while (true)
  257. {
  258. spinWait.SpinOnce();
  259. value = m_GlobalQueue[targetIndex];
  260. if (value.Array != null)
  261. return value;
  262. if (spinWait.Count > 50)
  263. return value;
  264. }
  265. }
  266. set
  267. {
  268. throw new NotSupportedException();
  269. }
  270. }
  271. /// <summary>
  272. /// Adds an item to the <see cref="T:System.Collections.Generic.ICollection`1" />.
  273. /// </summary>
  274. /// <param name="item">The object to add to the <see cref="T:System.Collections.Generic.ICollection`1" />.</param>
  275. /// <exception cref="System.NotSupportedException"></exception>
  276. public void Add(ArraySegment<byte> item)
  277. {
  278. throw new NotSupportedException();
  279. }
  280. /// <summary>
  281. /// Removes all items from the <see cref="T:System.Collections.Generic.ICollection`1" />.
  282. /// </summary>
  283. /// <exception cref="System.NotSupportedException"></exception>
  284. public void Clear()
  285. {
  286. if (m_TrackID >= ushort.MaxValue)
  287. m_TrackID = 1;
  288. else
  289. m_TrackID++;
  290. for (var i = 0; i < m_CurrentCount; i++)
  291. {
  292. m_GlobalQueue[m_Offset + i] = m_Null;
  293. }
  294. m_CurrentCount = 0;
  295. m_InnerOffset = 0;
  296. Position = 0;
  297. }
  298. /// <summary>
  299. /// Determines whether the <see cref="T:System.Collections.Generic.ICollection`1" /> contains a specific value.
  300. /// </summary>
  301. /// <param name="item">The object to locate in the <see cref="T:System.Collections.Generic.ICollection`1" />.</param>
  302. /// <returns>
  303. /// true if <paramref name="item" /> is found in the <see cref="T:System.Collections.Generic.ICollection`1" />; otherwise, false.
  304. /// </returns>
  305. /// <exception cref="System.NotSupportedException"></exception>
  306. public bool Contains(ArraySegment<byte> item)
  307. {
  308. throw new NotSupportedException();
  309. }
  310. /// <summary>
  311. /// Copies to.
  312. /// </summary>
  313. /// <param name="array">The array.</param>
  314. /// <param name="arrayIndex">Index of the array.</param>
  315. public void CopyTo(ArraySegment<byte>[] array, int arrayIndex)
  316. {
  317. for (var i = 0; i < Count; i++)
  318. {
  319. array[arrayIndex + i] = this[i];
  320. }
  321. }
  322. /// <summary>
  323. /// Gets a value indicating whether the <see cref="T:System.Collections.Generic.ICollection`1" /> is read-only.
  324. /// </summary>
  325. /// <returns>true if the <see cref="T:System.Collections.Generic.ICollection`1" /> is read-only; otherwise, false.</returns>
  326. public bool IsReadOnly
  327. {
  328. get { return m_ReadOnly; }
  329. }
  330. /// <summary>
  331. /// Removes the first occurrence of a specific object from the <see cref="T:System.Collections.Generic.ICollection`1" />.
  332. /// </summary>
  333. /// <param name="item">The object to remove from the <see cref="T:System.Collections.Generic.ICollection`1" />.</param>
  334. /// <returns>
  335. /// true if <paramref name="item" /> was successfully removed from the <see cref="T:System.Collections.Generic.ICollection`1" />; otherwise, false. This method also returns false if <paramref name="item" /> is not found in the original <see cref="T:System.Collections.Generic.ICollection`1" />.
  336. /// </returns>
  337. /// <exception cref="System.NotSupportedException"></exception>
  338. public bool Remove(ArraySegment<byte> item)
  339. {
  340. throw new NotSupportedException();
  341. }
  342. /// <summary>
  343. /// Returns an enumerator that iterates through the collection.
  344. /// </summary>
  345. /// <returns>
  346. /// A <see cref="T:System.Collections.Generic.IEnumerator`1" /> that can be used to iterate through the collection.
  347. /// </returns>
  348. /// <exception cref="System.NotSupportedException"></exception>
  349. public IEnumerator<ArraySegment<byte>> GetEnumerator()
  350. {
  351. for (var i = 0; i < (m_CurrentCount - m_InnerOffset); i++)
  352. {
  353. yield return m_GlobalQueue[m_Offset + m_InnerOffset + i];
  354. }
  355. }
  356. /// <summary>
  357. /// Returns an enumerator that iterates through a collection.
  358. /// </summary>
  359. /// <returns>
  360. /// An <see cref="T:System.Collections.IEnumerator" /> object that can be used to iterate through the collection.
  361. /// </returns>
  362. /// <exception cref="System.NotSupportedException"></exception>
  363. System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
  364. {
  365. return GetEnumerator();
  366. }
  367. /// <summary>
  368. /// Trim the internal segments at the begining by the binary data size.
  369. /// </summary>
  370. /// <param name="offset">The binary data size should be trimed at the begining.</param>
  371. public void InternalTrim(int offset)
  372. {
  373. var innerCount = m_CurrentCount - m_InnerOffset;
  374. var subTotal = 0;
  375. for (var i = m_InnerOffset; i < innerCount; i++)
  376. {
  377. var segment = m_GlobalQueue[m_Offset + i];
  378. subTotal += segment.Count;
  379. if (subTotal <= offset)
  380. continue;
  381. m_InnerOffset = i;
  382. var rest = subTotal - offset;
  383. m_GlobalQueue[m_Offset + i] = new ArraySegment<byte>(segment.Array, segment.Offset + segment.Count - rest, rest);
  384. break;
  385. }
  386. }
  387. }
  388. /// <summary>
  389. /// SendingQueueSourceCreator
  390. /// </summary>
  391. public class SendingQueueSourceCreator : ISmartPoolSourceCreator<SendingQueue>
  392. {
  393. private int m_SendingQueueSize;
  394. /// <summary>
  395. /// Initializes a new instance of the <see cref="SendingQueueSourceCreator" /> class.
  396. /// </summary>
  397. /// <param name="sendingQueueSize">Size of the sending queue.</param>
  398. public SendingQueueSourceCreator(int sendingQueueSize)
  399. {
  400. m_SendingQueueSize = sendingQueueSize;
  401. }
  402. /// <summary>
  403. /// Creates the specified size.
  404. /// </summary>
  405. /// <param name="size">The size.</param>
  406. /// <param name="poolItems">The pool items.</param>
  407. /// <returns></returns>
  408. public ISmartPoolSource Create(int size, out SendingQueue[] poolItems)
  409. {
  410. var source = new ArraySegment<byte>[size * m_SendingQueueSize];
  411. poolItems = new SendingQueue[size];
  412. for (var i = 0; i < size; i++)
  413. {
  414. poolItems[i] = new SendingQueue(source, i * m_SendingQueueSize, m_SendingQueueSize);
  415. }
  416. return new SmartPoolSource(source, size);
  417. }
  418. }
  419. }