Update SQLAlchemy

This commit is contained in:
Ruud
2013-06-14 11:00:06 +02:00
parent 267ecfacab
commit 4aa6700ceb
124 changed files with 6500 additions and 5207 deletions
+35 -1
View File
@@ -1,5 +1,5 @@
# util/queue.py
# Copyright (C) 2005-2012 the SQLAlchemy authors and contributors <see AUTHORS file>
# Copyright (C) 2005-2013 the SQLAlchemy authors and contributors <see AUTHORS file>
#
# This module is part of SQLAlchemy and is released under
# the MIT License: http://www.opensource.org/licenses/mit-license.php
@@ -16,6 +16,15 @@ condition."""
from collections import deque
from time import time as _time
from sqlalchemy.util import threading
import sys
if sys.version_info < (2, 6):
def notify_all(condition):
condition.notify()
else:
def notify_all(condition):
condition.notify_all()
__all__ = ['Empty', 'Full', 'Queue']
@@ -29,6 +38,11 @@ class Full(Exception):
pass
class SAAbort(Exception):
"Special SQLA exception to abort waiting"
def __init__(self, context):
self.context = context
class Queue:
def __init__(self, maxsize=0):
"""Initialize a queue object with a given maximum size.
@@ -49,6 +63,9 @@ class Queue:
# a thread waiting to put is notified then.
self.not_full = threading.Condition(self.mutex)
# when this is set, SAAbort is raised within get().
self._sqla_abort_context = False
def qsize(self):
"""Return the approximate size of the queue (not reliable!)."""
@@ -138,6 +155,8 @@ class Queue:
elif timeout is None:
while self._empty():
self.not_empty.wait()
if self._sqla_abort_context:
raise SAAbort(self._sqla_abort_context)
else:
if timeout < 0:
raise ValueError("'timeout' must be a positive number")
@@ -147,12 +166,27 @@ class Queue:
if remaining <= 0.0:
raise Empty
self.not_empty.wait(remaining)
if self._sqla_abort_context:
raise SAAbort(self._sqla_abort_context)
item = self._get()
self.not_full.notify()
return item
finally:
self.not_empty.release()
def abort(self, context):
"""Issue an 'abort', will force any thread waiting on get()
to stop waiting and raise SAAbort.
"""
self._sqla_abort_context = context
if not self.not_full.acquire(False):
return
try:
notify_all(self.not_empty)
finally:
self.not_full.release()
def get_nowait(self):
"""Remove and return an item from the queue without blocking.