gluon/contrib/pg8000/ thanks Mariano

This commit is contained in:
Massimo Di Pierro
2012-01-12 13:59:00 -06:00
parent f032cf0df5
commit 43ee14375b
8 changed files with 3747 additions and 1 deletions
+1 -1
View File
@@ -1 +1 @@
Version 1.99.4 (2012-01-12 13:22:21) stable
Version 1.99.4 (2012-01-12 13:58:45) stable
+37
View File
@@ -0,0 +1,37 @@
# vim: sw=4:expandtab:foldmethod=marker
#
# Copyright (c) 2007-2009, Mathieu Fenniak
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * The name of the author may not be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
__author__ = "Mathieu Fenniak"
import dbapi as DBAPI
pg8000_dbapi = DBAPI
from interface import *
from types import Bytea
+795
View File
@@ -0,0 +1,795 @@
# vim: sw=4:expandtab:foldmethod=marker
#
# Copyright (c) 2007-2009, Mathieu Fenniak
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * The name of the author may not be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
__author__ = "Mathieu Fenniak"
__version__ = "1.10"
import datetime
import time
import interface
import types
import threading
from errors import *
from warnings import warn
##
# The DBAPI level supported. Currently 2.0. This property is part of the
# DBAPI 2.0 specification.
apilevel = "2.0"
##
# Integer constant stating the level of thread safety the DBAPI interface
# supports. This DBAPI interface supports sharing of the module, connections,
# and cursors. This property is part of the DBAPI 2.0 specification.
threadsafety = 3
##
# String property stating the type of parameter marker formatting expected by
# the interface. This value defaults to "format". This property is part of
# the DBAPI 2.0 specification.
# <p>
# Unlike the DBAPI specification, this value is not constant. It can be
# changed to any standard paramstyle value (ie. qmark, numeric, named, format,
# and pyformat).
paramstyle = 'format' # paramstyle can be changed to any DB-API paramstyle
def convert_paramstyle(src_style, query, args):
# I don't see any way to avoid scanning the query string char by char,
# so we might as well take that careful approach and create a
# state-based scanner. We'll use int variables for the state.
# 0 -- outside quoted string
# 1 -- inside single-quote string '...'
# 2 -- inside quoted identifier "..."
# 3 -- inside escaped single-quote string, E'...'
state = 0
output_query = ""
output_args = []
if src_style == "numeric":
output_args = args
elif src_style in ("pyformat", "named"):
mapping_to_idx = {}
i = 0
while 1:
if i == len(query):
break
c = query[i]
# print "begin loop", repr(i), repr(c), repr(state)
if state == 0:
if c == "'":
i += 1
output_query += c
state = 1
elif c == '"':
i += 1
output_query += c
state = 2
elif c == 'E':
# check for escaped single-quote string
i += 1
if i < len(query) and i > 1 and query[i] == "'":
i += 1
output_query += "E'"
state = 3
else:
output_query += c
elif src_style == "qmark" and c == "?":
i += 1
param_idx = len(output_args)
if param_idx == len(args):
raise QueryParameterIndexError("too many parameter fields, not enough parameters")
output_args.append(args[param_idx])
output_query += "$" + str(param_idx + 1)
elif src_style == "numeric" and c == ":":
i += 1
if i < len(query) and i > 1 and query[i].isdigit():
output_query += "$" + query[i]
i += 1
else:
raise QueryParameterParseError("numeric parameter : does not have numeric arg")
elif src_style == "named" and c == ":":
name = ""
while 1:
i += 1
if i == len(query):
break
c = query[i]
if c.isalnum() or c == '_':
name += c
else:
break
if name == "":
raise QueryParameterParseError("empty name of named parameter")
idx = mapping_to_idx.get(name)
if idx == None:
idx = len(output_args)
output_args.append(args[name])
idx += 1
mapping_to_idx[name] = idx
output_query += "$" + str(idx)
elif src_style == "format" and c == "%":
i += 1
if i < len(query) and i > 1:
if query[i] == "s":
param_idx = len(output_args)
if param_idx == len(args):
raise QueryParameterIndexError("too many parameter fields, not enough parameters")
output_args.append(args[param_idx])
output_query += "$" + str(param_idx + 1)
elif query[i] == "%":
output_query += "%"
else:
raise QueryParameterParseError("Only %s and %% are supported")
i += 1
else:
raise QueryParameterParseError("format parameter % does not have format code")
elif src_style == "pyformat" and c == "%":
i += 1
if i < len(query) and i > 1:
if query[i] == "(":
i += 1
# begin mapping name
end_idx = query.find(')', i)
if end_idx == -1:
raise QueryParameterParseError("began pyformat dict read, but couldn't find end of name")
else:
name = query[i:end_idx]
i = end_idx + 1
if i < len(query) and query[i] == "s":
i += 1
idx = mapping_to_idx.get(name)
if idx == None:
idx = len(output_args)
output_args.append(args[name])
idx += 1
mapping_to_idx[name] = idx
output_query += "$" + str(idx)
else:
raise QueryParameterParseError("format not specified or not supported (only %(...)s supported)")
elif query[i] == "%":
output_query += "%"
elif query[i] == "s":
# we have a %s in a pyformat query string. Assume
# support for format instead.
i -= 1
src_style = "format"
else:
raise QueryParameterParseError("Only %(name)s, %s and %% are supported")
else:
i += 1
output_query += c
elif state == 1:
output_query += c
i += 1
if c == "'":
# Could be a double ''
if i < len(query) and query[i] == "'":
# is a double quote.
output_query += query[i]
i += 1
else:
state = 0
elif src_style in ("pyformat","format") and c == "%":
# hm... we're only going to support an escaped percent sign
if i < len(query):
if query[i] == "%":
# good. We already output the first percent sign.
i += 1
else:
raise QueryParameterParseError("'%" + query[i] + "' not supported in quoted string")
elif state == 2:
output_query += c
i += 1
if c == '"':
state = 0
elif src_style in ("pyformat","format") and c == "%":
# hm... we're only going to support an escaped percent sign
if i < len(query):
if query[i] == "%":
# good. We already output the first percent sign.
i += 1
else:
raise QueryParameterParseError("'%" + query[i] + "' not supported in quoted string")
elif state == 3:
output_query += c
i += 1
if c == "\\":
# check for escaped single-quote
if i < len(query) and query[i] == "'":
output_query += "'"
i += 1
elif c == "'":
state = 0
elif src_style in ("pyformat","format") and c == "%":
# hm... we're only going to support an escaped percent sign
if i < len(query):
if query[i] == "%":
# good. We already output the first percent sign.
i += 1
else:
raise QueryParameterParseError("'%" + query[i] + "' not supported in quoted string")
return output_query, tuple(output_args)
def require_open_cursor(fn):
def _fn(self, *args, **kwargs):
if self.cursor == None:
raise CursorClosedError()
return fn(self, *args, **kwargs)
return _fn
##
# The class of object returned by the {@link #ConnectionWrapper.cursor cursor method}.
class CursorWrapper(object):
def __init__(self, conn, connection):
self.cursor = interface.Cursor(conn)
self.arraysize = 1
self._connection = connection
self._override_rowcount = None
##
# This read-only attribute returns a reference to the connection object on
# which the cursor was created.
# <p>
# Stability: Part of a DBAPI 2.0 extension. A warning "DB-API extension
# cursor.connection used" will be fired.
connection = property(lambda self: self._getConnection())
def _getConnection(self):
warn("DB-API extension cursor.connection used", stacklevel=3)
return self._connection
##
# This read-only attribute specifies the number of rows that the last
# .execute*() produced (for DQL statements like 'select') or affected (for
# DML statements like 'update' or 'insert').
# <p>
# The attribute is -1 in case no .execute*() has been performed on the
# cursor or the rowcount of the last operation is cannot be determined by
# the interface.
# <p>
# Stability: Part of the DBAPI 2.0 specification.
rowcount = property(lambda self: self._getRowCount())
@require_open_cursor
def _getRowCount(self):
if self._override_rowcount != None:
return self._override_rowcount
return self.cursor.row_count
##
# This read-only attribute is a sequence of 7-item sequences. Each value
# contains information describing one result column. The 7 items returned
# for each column are (name, type_code, display_size, internal_size,
# precision, scale, null_ok). Only the first two values are provided by
# this interface implementation.
# <p>
# Stability: Part of the DBAPI 2.0 specification.
description = property(lambda self: self._getDescription())
@require_open_cursor
def _getDescription(self):
if self.cursor.row_description == None:
return None
columns = []
for col in self.cursor.row_description:
columns.append((col["name"], col["type_oid"], None, None, None, None, None))
return columns
##
# Executes a database operation. Parameters may be provided as a sequence
# or mapping and will be bound to variables in the operation.
# <p>
# Stability: Part of the DBAPI 2.0 specification.
@require_open_cursor
def execute(self, operation, args=()):
if not self._connection.in_transaction:
self._connection.begin()
self._override_rowcount = None
self._execute(operation, args)
def _execute(self, operation, args=()):
new_query, new_args = convert_paramstyle(paramstyle, operation, args)
try:
self.cursor.execute(new_query, *new_args)
except ConnectionClosedError:
# can't rollback in this case
raise
except:
# any error will rollback the transaction to-date
self._connection.rollback()
raise
def copy_from(self, fileobj, table=None, sep='\t', null=None, query=None):
if query == None:
if table == None:
raise CopyQueryOrTableRequiredError()
query = "COPY %s FROM stdout DELIMITER '%s'" % (table, sep)
if null is not None:
query += " NULL '%s'" % (null,)
self.copy_execute(fileobj, query)
def copy_to(self, fileobj, table=None, sep='\t', null=None, query=None):
if query == None:
if table == None:
raise CopyQueryOrTableRequiredError()
query = "COPY %s TO stdout DELIMITER '%s'" % (table, sep)
if null is not None:
query += " NULL '%s'" % (null,)
self.copy_execute(fileobj, query)
@require_open_cursor
def copy_execute(self, fileobj, query):
try:
self.cursor.execute(query, stream=fileobj)
except ConnectionClosedError:
# can't rollback in this case
raise
except:
# any error will rollback the transaction to-date
import traceback; traceback.print_exc()
self._connection.rollback()
raise
##
# Prepare a database operation and then execute it against all parameter
# sequences or mappings provided.
# <p>
# Stability: Part of the DBAPI 2.0 specification.
@require_open_cursor
def executemany(self, operation, parameter_sets):
if not self._connection.in_transaction:
self._connection.begin()
self._override_rowcount = 0
for parameters in parameter_sets:
self._execute(operation, parameters)
if self.cursor.row_count == -1 or self._override_rowcount == -1:
self._override_rowcount = -1
else:
self._override_rowcount += self.cursor.row_count
##
# Fetch the next row of a query result set, returning a single sequence, or
# None when no more data is available.
# <p>
# Stability: Part of the DBAPI 2.0 specification.
@require_open_cursor
def fetchone(self):
return self.cursor.read_tuple()
##
# Fetch the next set of rows of a query result, returning a sequence of
# sequences. An empty sequence is returned when no more rows are
# available.
# <p>
# Stability: Part of the DBAPI 2.0 specification.
# @param size The number of rows to fetch when called. If not provided,
# the arraysize property value is used instead.
def fetchmany(self, size=None):
if size == None:
size = self.arraysize
rows = []
for i in range(size):
value = self.fetchone()
if value == None:
break
rows.append(value)
return rows
##
# Fetch all remaining rows of a query result, returning them as a sequence
# of sequences.
# <p>
# Stability: Part of the DBAPI 2.0 specification.
@require_open_cursor
def fetchall(self):
return tuple(self.cursor.iterate_tuple())
##
# Close the cursor.
# <p>
# Stability: Part of the DBAPI 2.0 specification.
@require_open_cursor
def close(self):
self.cursor.close()
self.cursor = None
self._override_rowcount = None
def next(self):
warn("DB-API extension cursor.next() used", stacklevel=2)
retval = self.fetchone()
if retval == None:
raise StopIteration()
return retval
def __iter__(self):
warn("DB-API extension cursor.__iter__() used", stacklevel=2)
return self
def setinputsizes(self, sizes):
pass
def setoutputsize(self, size, column=None):
pass
@require_open_cursor
def fileno(self):
return self.cursor.fileno()
@require_open_cursor
def isready(self):
return self.cursor.isready()
def require_open_connection(fn):
def _fn(self, *args, **kwargs):
if self.conn == None:
raise ConnectionClosedError()
return fn(self, *args, **kwargs)
return _fn
##
# The class of object returned by the {@link #connect connect method}.
class ConnectionWrapper(object):
# DBAPI Extension: supply exceptions as attributes on the connection
Warning = property(lambda self: self._getError(Warning))
Error = property(lambda self: self._getError(Error))
InterfaceError = property(lambda self: self._getError(InterfaceError))
DatabaseError = property(lambda self: self._getError(DatabaseError))
OperationalError = property(lambda self: self._getError(OperationalError))
IntegrityError = property(lambda self: self._getError(IntegrityError))
InternalError = property(lambda self: self._getError(InternalError))
ProgrammingError = property(lambda self: self._getError(ProgrammingError))
NotSupportedError = property(lambda self: self._getError(NotSupportedError))
def _getError(self, error):
warn("DB-API extension connection.%s used" % error.__name__, stacklevel=3)
return error
@property
def in_transaction(self):
if self.conn:
return self.conn.in_transaction
return False
def __init__(self, **kwargs):
self.conn = interface.Connection(**kwargs)
self.notifies = []
self.notifies_lock = threading.Lock()
self.conn.NotificationReceived += self._notificationReceived
# Two Phase Commit internal attributes:
self.__tpc_xid = None
self.__tpc_prepared = None
def set_autocommit(self, state):
if self.conn.in_transaction and state and not self.conn.autocommit:
warn("enabling autocommit in an open transaction!")
self.conn.autocommit = state
def get_autocommit(self):
return self.conn.autocommit
autocommit = property(get_autocommit, set_autocommit)
@require_open_connection
def begin(self):
self.conn.begin()
def _notificationReceived(self, notice):
try:
# psycopg2 compatible notification interface
self.notifies_lock.acquire()
self.notifies.append((notice.backend_pid, notice.condition))
finally:
self.notifies_lock.release()
##
# Creates a {@link #CursorWrapper CursorWrapper} object bound to this
# connection.
# <p>
# Stability: Part of the DBAPI 2.0 specification.
@require_open_connection
def cursor(self):
return CursorWrapper(self.conn, self)
##
# Commits the current database transaction.
# <p>
# Stability: Part of the DBAPI 2.0 specification.
@require_open_connection
def commit(self):
# There's a threading bug here. If a query is sent after the
# commit, but before the begin, it will be executed immediately
# without a surrounding transaction. Like all threading bugs -- it
# sounds unlikely, until it happens every time in one
# application... however, to fix this, we need to lock the
# database connection entirely, so that no cursors can execute
# statements on other threads. Support for that type of lock will
# be done later.
if self.__tpc_xid:
raise ProgrammingError("Cannot do a normal commit() inside a "
"TPC transaction!")
self.conn.commit()
##
# Rolls back the current database transaction.
# <p>
# Stability: Part of the DBAPI 2.0 specification.
@require_open_connection
def rollback(self):
# see bug description in commit.
if self.__tpc_xid:
raise ProgrammingError("Cannot do a normal rollback() inside a "
"TPC transaction!")
self.conn.rollback()
##
# Closes the database connection.
# <p>
# Stability: Part of the DBAPI 2.0 specification.
@require_open_connection
def close(self):
self.conn.close()
self.conn = None
##
# Returns the "server_version" string provided by the connected server.
# <p>
# Stability: Extension of the DBAPI 2.0 specification.
@property
@require_open_connection
def server_version(self):
return self.conn.server_version()
# Stability: psycopg2 compatibility
@require_open_connection
def set_client_encoding(self, encoding=None):
"Set the client encoding for the current session"
if encoding:
self.conn.execute("SET client_encoding TO '%s';" % (encoding, ), simple_query=True)
return self.conn.encoding()
def xid(self,format_id, global_transaction_id, branch_qualifier):
"""Create a Transaction IDs (only global_transaction_id is used in pg)
format_id and branch_qualifier are not used in postgres
global_transaction_id may be any string identifier supported by postgres
returns a tuple (format_id, global_transaction_id, branch_qualifier)"""
return (format_id, global_transaction_id, branch_qualifier)
@require_open_connection
def tpc_begin(self,xid):
"Begin a two-phase transaction"
# set auto-commit mode to begin a TPC transaction
self.autocommit = False
# (actually in postgres at this point it is a normal one)
if self.conn.in_transaction:
warn("tpc_begin() should be called outside a transaction block",
stacklevel=3)
self.conn.begin()
# store actual TPC transaction id
self.__tpc_xid = xid
self.__tpc_prepared = False
@require_open_connection
def tpc_prepare(self):
"Prepare a two-phase transaction"
if not self.__tpc_xid:
raise ProgrammingError("tpc_prepare() outside a TPC transaction "
"is not allowed!")
# Prepare the TPC
self.conn.execute("PREPARE TRANSACTION '%s';" % (self.__tpc_xid[1],),
simple_query=True)
self.conn.in_transaction = False
self.__tpc_prepared = True
@require_open_connection
def tpc_commit(self, xid=None):
"Commit a prepared two-phase transaction"
try:
# save current autocommit status (to be recovered later)
previous_autocommit_mode = self.autocommit
if not xid:
# use current tpc transaction
tpc_xid = self.__tpc_xid
else:
# use a recovered tpc transaction
tpc_xid = xid
if not xid in self.tpc_recover():
raise ProgrammingError("Requested TPC transaction is not "
"prepared!")
if not tpc_xid:
raise ProgrammingError("Cannot tpc_commit() without a TPC "
"transaction!")
if self.__tpc_prepared or (xid != self.__tpc_xid and xid):
# a two-phase commit:
# set the auto-commit mode for TPC commit
self.autocommit = True
try:
self.conn.execute("COMMIT PREPARED '%s';" % (tpc_xid[1], ),
simple_query=True)
finally:
# return to previous auto-commit mode
self.autocommit = previous_autocommit_mode
else:
try:
# a single-phase commit
self.conn.commit()
finally:
# return to previous auto-commit mode
self.autocommit = previous_autocommit_mode
finally:
# transaction is done, clear xid
self.__tpc_xid = None
@require_open_connection
def tpc_rollback(self, xid=None):
"Commit a prepared two-phase transaction"
try:
# save current autocommit status (to be recovered later)
previous_autocommit_mode = self.autocommit
if not xid:
# use current tpc transaction
tpc_xid = self.__tpc_xid
else:
# use a recovered tpc transaction
tpc_xid = xid
if not xid in self.tpc_recover():
raise ProgrammingError("Requested TPC transaction is not prepared!")
if not tpc_xid:
raise ProgrammingError("Cannot tpc_rollback() without a TPC prepared transaction!")
if self.__tpc_prepared or (xid != self.__tpc_xid and xid):
# a two-phase rollback
# set auto-commit for the TPC rollback
self.autocommit = True
try:
self.conn.execute("ROLLBACK PREPARED '%s';" % (tpc_xid[1],),
simple_query=True)
finally:
# return to previous auto-commit mode
self.autocommit = previous_autocommit_mode
else:
# a single-phase rollback
try:
self.conn.rollback()
finally:
# return to previous auto-commit mode
self.autocommit = previous_autocommit_mode
finally:
# transaction is done, clear xid
self.__tpc_xid = None
@require_open_connection
def tpc_recover(self):
"Returns a list of pending transaction IDs"
previous_autocommit_mode = self.autocommit
if not self.conn.in_transaction and not self.autocommit:
self.autocommit = True
elif not self.autocommit:
warn("tpc_recover() will open a transaction block", stacklevel=3)
curs = self.cursor()
xids = []
try:
# query system view that stores open (prepared) TPC transactions
curs.execute("SELECT gid FROM pg_prepared_xacts;");
xids.extend([self.xid(0,row[0],'') for row in curs])
finally:
curs.close()
# return to previous auto-commit mode
self.autocommit = previous_autocommit_mode
# return a list of TPC transaction ids (xid)
return xids
##
# Creates a DBAPI 2.0 compatible interface to a PostgreSQL database.
# <p>
# Stability: Part of the DBAPI 2.0 specification.
#
# @param user The username to connect to the PostgreSQL server with. This
# parameter is required.
#
# @keyparam host The hostname of the PostgreSQL server to connect with.
# Providing this parameter is necessary for TCP/IP connections. One of either
# host, or unix_sock, must be provided.
#
# @keyparam unix_sock The path to the UNIX socket to access the database
# through, for example, '/tmp/.s.PGSQL.5432'. One of either unix_sock or host
# must be provided. The port parameter will have no affect if unix_sock is
# provided.
#
# @keyparam port The TCP/IP port of the PostgreSQL server instance. This
# parameter defaults to 5432, the registered and common port of PostgreSQL
# TCP/IP servers.
#
# @keyparam database The name of the database instance to connect with. This
# parameter is optional, if omitted the PostgreSQL server will assume the
# database name is the same as the username.
#
# @keyparam password The user password to connect to the server with. This
# parameter is optional. If omitted, and the database server requests password
# based authentication, the connection will fail. On the other hand, if this
# parameter is provided and the database does not request password
# authentication, then the password will not be used.
#
# @keyparam socket_timeout Socket connect timeout measured in seconds.
# Defaults to 60 seconds.
#
# @keyparam ssl Use SSL encryption for TCP/IP socket. Defaults to False.
#
# @return An instance of {@link #ConnectionWrapper ConnectionWrapper}.
def connect(dsn="", user=None, host=None, unix_sock=None, port=5432, database=None, password=None, socket_timeout=60, ssl=False):
return ConnectionWrapper(dsn=dsn, user=user, host=host,
unix_sock=unix_sock, port=port, database=database,
password=password, socket_timeout=socket_timeout, ssl=ssl)
def Date(year, month, day):
return datetime.date(year, month, day)
def Time(hour, minute, second):
return datetime.time(hour, minute, second)
def Timestamp(year, month, day, hour, minute, second):
return datetime.datetime(year, month, day, hour, minute, second)
def DateFromTicks(ticks):
return Date(*time.localtime(ticks)[:3])
def TimeFromTicks(ticks):
return Time(*time.localtime(ticks)[3:6])
def TimestampFromTicks(ticks):
return Timestamp(*time.localtime(ticks)[:6])
##
# Construct an object holding binary data.
def Binary(value):
return types.Bytea(value)
# I have no idea what this would be used for by a client app. Should it be
# TEXT, VARCHAR, CHAR? It will only compare against row_description's
# type_code if it is this one type. It is the varchar type oid for now, this
# appears to match expectations in the DB API 2.0 compliance test suite.
STRING = 1043
# bytea type_oid
BINARY = 17
# numeric type_oid
NUMBER = 1700
# timestamp type_oid
DATETIME = 1114
# oid type_oid
ROWID = 26
+115
View File
@@ -0,0 +1,115 @@
# vim: sw=4:expandtab:foldmethod=marker
#
# Copyright (c) 2007-2009, Mathieu Fenniak
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * The name of the author may not be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
__author__ = "Mathieu Fenniak"
class Warning(StandardError):
pass
class Error(StandardError):
pass
class InterfaceError(Error):
pass
class ConnectionClosedError(InterfaceError):
def __init__(self):
InterfaceError.__init__(self, "connection is closed")
class CursorClosedError(InterfaceError):
def __init__(self):
InterfaceError.__init__(self, "cursor is closed")
class DatabaseError(Error):
pass
class DataError(DatabaseError):
pass
class OperationalError(DatabaseError):
pass
class IntegrityError(DatabaseError):
pass
class InternalError(DatabaseError):
pass
class ProgrammingError(DatabaseError):
pass
class NotSupportedError(DatabaseError):
pass
##
# An exception that is thrown when an internal error occurs trying to
# decode binary array data from the server.
class ArrayDataParseError(InternalError):
pass
##
# Thrown when attempting to transmit an array of unsupported data types.
class ArrayContentNotSupportedError(NotSupportedError):
pass
##
# Thrown when attempting to send an array that doesn't contain all the same
# type of objects (eg. some floats, some ints).
class ArrayContentNotHomogenousError(ProgrammingError):
pass
##
# Attempted to pass an empty array in, but it's not possible to determine the
# data type for an empty array.
class ArrayContentEmptyError(ProgrammingError):
pass
##
# Attempted to use a multidimensional array with inconsistent array sizes.
class ArrayDimensionsNotConsistentError(ProgrammingError):
pass
# A cursor's copy_to or copy_from argument was not provided a table or query
# to operate on.
class CopyQueryOrTableRequiredError(ProgrammingError):
pass
# Raised if a COPY query is executed without using copy_to or copy_from
# functions to provide a data stream.
class CopyQueryWithoutStreamError(ProgrammingError):
pass
# When query parameters don't match up with query args.
class QueryParameterIndexError(ProgrammingError):
pass
# Some sort of parse error occured during query parameterization.
class QueryParameterParseError(ProgrammingError):
pass
+660
View File
@@ -0,0 +1,660 @@
# vim: sw=4:expandtab:foldmethod=marker
#
# Copyright (c) 2007-2009, Mathieu Fenniak
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * The name of the author may not be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
__author__ = "Mathieu Fenniak"
import socket
import protocol
import threading
from errors import *
def conninfo_parse(conninfo):
"Conninfo parser routine based on libpq conninfo_parse"
options = {}
buf = conninfo + " "
tmp = pname = ""
quoted_string = False
cp = 0
while cp < len(buf):
# Skip blanks before the parameter name
c = buf[cp]
if c.isspace() and tmp and not quoted_string and pname:
options[pname] = tmp
tmp = pname = ""
elif c == "'":
quoted_string = not quoted_string
elif c == '\\':
cp += 1
tmp += buf[cp]
elif c == "=":
if not tmp:
raise RuntimeError("missing parameter name (conninfo:%s)" % cp)
pname = tmp
tmp = ""
elif not c.isspace() or quoted_string:
tmp += c
cp += 1
if quoted_string:
raise RuntimeError("unterminated quoted string (conninfo:%s)" % cp)
return options
class DataIterator(object):
def __init__(self, obj, func):
self.obj = obj
self.func = func
def __iter__(self):
return self
def next(self):
retval = self.func(self.obj)
if retval == None:
raise StopIteration()
return retval
statement_number_lock = threading.Lock()
statement_number = 0
##
# This class represents a prepared statement. A prepared statement is
# pre-parsed on the server, which reduces the need to parse the query every
# time it is run. The statement can have parameters in the form of $1, $2, $3,
# etc. When parameters are used, the types of the parameters need to be
# specified when creating the prepared statement.
# <p>
# As of v1.01, instances of this class are thread-safe. This means that a
# single PreparedStatement can be accessed by multiple threads without the
# internal consistency of the statement being altered. However, the
# responsibility is on the client application to ensure that one thread reading
# from a statement isn't affected by another thread starting a new query with
# the same statement.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
#
# @param connection An instance of {@link Connection Connection}.
#
# @param statement The SQL statement to be represented, often containing
# parameters in the form of $1, $2, $3, etc.
#
# @param types Python type objects for each parameter in the SQL
# statement. For example, int, float, str.
class PreparedStatement(object):
##
# Determines the number of rows to read from the database server at once.
# Reading more rows increases performance at the cost of memory. The
# default value is 100 rows. The affect of this parameter is transparent.
# That is, the library reads more rows when the cache is empty
# automatically.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx. It is
# possible that implementation changes in the future could cause this
# parameter to be ignored.
row_cache_size = 100
def __init__(self, connection, statement, *types, **kwargs):
global statement_number
if connection == None or connection.c == None:
raise InterfaceError("connection not provided")
try:
statement_number_lock.acquire()
self._statement_number = statement_number
statement_number += 1
finally:
statement_number_lock.release()
self.c = connection.c
self._portal_name = None
self._statement_name = kwargs.get("statement_name", "pg8000_statement_%s" % self._statement_number)
self._row_desc = None
self._cached_rows = []
self._ongoing_row_count = 0
self._command_complete = True
self._parse_row_desc = self.c.parse(self._statement_name, statement, types)
self._lock = threading.RLock()
def close(self):
if self._statement_name != "": # don't close unnamed statement
self.c.close_statement(self._statement_name)
if self._portal_name != None:
self.c.close_portal(self._portal_name)
self._portal_name = None
row_description = property(lambda self: self._getRowDescription())
def _getRowDescription(self):
if self._row_desc == None:
return None
return self._row_desc.fields
##
# Run the SQL prepared statement with the given parameters.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
def execute(self, *args, **kwargs):
self._lock.acquire()
try:
if not self._command_complete:
# cleanup last execute
self._cached_rows = []
self._ongoing_row_count = 0
if self._portal_name != None:
self.c.close_portal(self._portal_name)
self._command_complete = False
self._portal_name = "pg8000_portal_%s" % self._statement_number
self._row_desc, cmd = self.c.bind(self._portal_name, self._statement_name, args, self._parse_row_desc, kwargs.get("stream"))
if self._row_desc:
# We execute our cursor right away to fill up our cache. This
# prevents the cursor from being destroyed, apparently, by a rogue
# Sync between Bind and Execute. Since it is quite likely that
# data will be read from us right away anyways, this seems a safe
# move for now.
self._fill_cache()
else:
self._command_complete = True
self._ongoing_row_count = -1
if cmd != None and cmd.rows != None:
self._ongoing_row_count = cmd.rows
finally:
self._lock.release()
def _fill_cache(self):
self._lock.acquire()
try:
if self._cached_rows:
raise InternalError("attempt to fill cache that isn't empty")
end_of_data, rows = self.c.fetch_rows(self._portal_name, self.row_cache_size, self._row_desc)
self._cached_rows = rows
if end_of_data:
self._command_complete = True
finally:
self._lock.release()
def _fetch(self):
if not self._row_desc:
raise ProgrammingError("no result set")
self._lock.acquire()
try:
if not self._cached_rows:
if self._command_complete:
return None
self._fill_cache()
if self._command_complete and not self._cached_rows:
# fill cache tells us the command is complete, but yet we have
# no rows after filling our cache. This is a special case when
# a query returns no rows.
return None
row = self._cached_rows.pop(0)
self._ongoing_row_count += 1
return tuple(row)
finally:
self._lock.release()
##
# Return a count of the number of rows relevant to the executed statement.
# For a SELECT, this is the number of rows returned. For UPDATE or DELETE,
# this the number of rows affected. For INSERT, the number of rows
# inserted. This property may have a value of -1 to indicate that there
# was no row count.
# <p>
# During a result-set query (eg. SELECT, or INSERT ... RETURNING ...),
# accessing this property requires reading the entire result-set into
# memory, as reading the data to completion is the only way to determine
# the total number of rows. Avoid using this property in with
# result-set queries, as it may cause unexpected memory usage.
# <p>
# Stability: Added in v1.03, stability guaranteed for v1.xx.
row_count = property(lambda self: self._get_row_count())
def _get_row_count(self):
self._lock.acquire()
try:
if not self._command_complete:
end_of_data, rows = self.c.fetch_rows(self._portal_name, 0, self._row_desc)
self._cached_rows += rows
if end_of_data:
self._command_complete = True
else:
raise InternalError("fetch_rows(0) did not hit end of data")
return self._ongoing_row_count + len(self._cached_rows)
finally:
self._lock.release()
##
# Read a row from the database server, and return it in a dictionary
# indexed by column name/alias. This method will raise an error if two
# columns have the same name. Returns None after the last row.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
def read_dict(self):
row = self._fetch()
if row == None:
return row
retval = {}
for i in range(len(self._row_desc.fields)):
col_name = self._row_desc.fields[i]['name']
if retval.has_key(col_name):
raise InterfaceError("cannot return dict of row when two columns have the same name (%r)" % (col_name,))
retval[col_name] = row[i]
return retval
##
# Read a row from the database server, and return it as a tuple of values.
# Returns None after the last row.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
def read_tuple(self):
return self._fetch()
##
# Return an iterator for the output of this statement. The iterator will
# return a tuple for each row, in the same manner as {@link
# #PreparedStatement.read_tuple read_tuple}.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
def iterate_tuple(self):
return DataIterator(self, PreparedStatement.read_tuple)
##
# Return an iterator for the output of this statement. The iterator will
# return a dict for each row, in the same manner as {@link
# #PreparedStatement.read_dict read_dict}.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
def iterate_dict(self):
return DataIterator(self, PreparedStatement.read_dict)
class SimpleStatement(PreparedStatement):
"Internal wrapper to Simple Query protocol emulating a PreparedStatement"
# This should be used internally only for trivial queries
# (not a true Prepared Statement, in fact it can have multiple statements)
# See Simple Query Protocol limitations and trade-offs (send_simple_query)
row_cache_size = None
def __init__(self, connection, statement):
if connection == None or connection.c == None:
raise InterfaceError("connection not provided")
self.c = connection.c
self._row_desc = None
self._cached_rows = []
self._ongoing_row_count = -1
self._command_complete = True
self.statement = statement
self._lock = threading.RLock()
def close(self):
# simple query doesn't have portals
pass
def execute(self, *args, **kwargs):
"Run the SQL simple query stataments"
self._lock.acquire()
try:
self._row_desc, cmd_complete, self._cached_rows = \
self.c.send_simple_query(self.statement, kwargs.get("stream"))
self._command_complete = True
self._ongoing_row_count = -1
if cmd_complete is not None and cmd_complete.rows is not None:
self._ongoing_row_count = cmd_complete.rows
finally:
self._lock.release()
def _fill_cache(self):
# data rows are already fetched in _cached_rows
pass
def _fetch(self):
if not self._row_desc:
raise ProgrammingError("no result set")
self._lock.acquire()
try:
if not self._cached_rows:
return None
row = self._cached_rows.pop(0)
return tuple(row)
finally:
self._lock.release()
def _get_row_count(self):
return self._ongoing_row_count
##
# The Cursor class allows multiple queries to be performed concurrently with a
# single PostgreSQL connection. The Cursor object is implemented internally by
# using a {@link PreparedStatement PreparedStatement} object, so if you plan to
# use a statement multiple times, you might as well create a PreparedStatement
# and save a small amount of reparsing time.
# <p>
# As of v1.01, instances of this class are thread-safe. See {@link
# PreparedStatement PreparedStatement} for more information.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
#
# @param connection An instance of {@link Connection Connection}.
class Cursor(object):
def __init__(self, connection):
self.connection = connection
self._stmt = None
def require_stmt(func):
def retval(self, *args, **kwargs):
if self._stmt == None:
raise ProgrammingError("attempting to use unexecuted cursor")
return func(self, *args, **kwargs)
return retval
row_description = property(lambda self: self._getRowDescription())
def _getRowDescription(self):
if self._stmt == None:
return None
return self._stmt.row_description
##
# Run an SQL statement using this cursor. The SQL statement can have
# parameters in the form of $1, $2, $3, etc., which will be filled in by
# the additional arguments passed to this function.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
# @param query The SQL statement to execute.
def execute(self, query, *args, **kwargs):
if self.connection.is_closed:
raise ConnectionClosedError()
self.connection._unnamed_prepared_statement_lock.acquire()
try:
if kwargs.get("simple_query"):
# no arguments and no statement name,
# use PostgreSQL Simple Query Protocol
## print "SimpleQuery:", query
self._stmt = SimpleStatement(self.connection, query)
else:
# use PostgreSQL Extended Query Protocol
self._stmt = PreparedStatement(self.connection, query, statement_name="", *[{"type": type(x), "value": x} for x in args])
self._stmt.execute(*args, **kwargs)
finally:
self.connection._unnamed_prepared_statement_lock.release()
##
# Return a count of the number of rows currently being read. If possible,
# please avoid using this function. It requires reading the entire result
# set from the database to determine the number of rows being returned.
# <p>
# Stability: Added in v1.03, stability guaranteed for v1.xx.
# Implementation currently requires caching entire result set into memory,
# avoid using this property.
row_count = property(lambda self: self._get_row_count())
@require_stmt
def _get_row_count(self):
return self._stmt.row_count
##
# Read a row from the database server, and return it in a dictionary
# indexed by column name/alias. This method will raise an error if two
# columns have the same name. Returns None after the last row.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
@require_stmt
def read_dict(self):
return self._stmt.read_dict()
##
# Read a row from the database server, and return it as a tuple of values.
# Returns None after the last row.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
@require_stmt
def read_tuple(self):
return self._stmt.read_tuple()
##
# Return an iterator for the output of this statement. The iterator will
# return a tuple for each row, in the same manner as {@link
# #PreparedStatement.read_tuple read_tuple}.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
@require_stmt
def iterate_tuple(self):
return self._stmt.iterate_tuple()
##
# Return an iterator for the output of this statement. The iterator will
# return a dict for each row, in the same manner as {@link
# #PreparedStatement.read_dict read_dict}.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
@require_stmt
def iterate_dict(self):
return self._stmt.iterate_dict()
def close(self):
if self._stmt != None:
self._stmt.close()
self._stmt = None
##
# Return the fileno of the underlying socket for this cursor's connection.
# <p>
# Stability: Added in v1.07, stability guaranteed for v1.xx.
def fileno(self):
return self.connection.fileno()
##
# Poll the underlying socket for this cursor and sync if there is data waiting
# to be read. This has the effect of flushing asynchronous messages from the
# backend. Returns True if messages were read, False otherwise.
# <p>
# Stability: Added in v1.07, stability guaranteed for v1.xx.
def isready(self):
return self.connection.isready()
##
# This class represents a connection to a PostgreSQL database.
# <p>
# The database connection is derived from the {@link #Cursor Cursor} class,
# which provides a default cursor for running queries. It also provides
# transaction control via the 'begin', 'commit', and 'rollback' methods.
# Without beginning a transaction explicitly, all statements will autocommit to
# the database.
# <p>
# As of v1.01, instances of this class are thread-safe. See {@link
# PreparedStatement PreparedStatement} for more information.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
#
# @param user The username to connect to the PostgreSQL server with. This
# parameter is required.
#
# @keyparam host The hostname of the PostgreSQL server to connect with.
# Providing this parameter is necessary for TCP/IP connections. One of either
# host, or unix_sock, must be provided.
#
# @keyparam unix_sock The path to the UNIX socket to access the database
# through, for example, '/tmp/.s.PGSQL.5432'. One of either unix_sock or host
# must be provided. The port parameter will have no affect if unix_sock is
# provided.
#
# @keyparam port The TCP/IP port of the PostgreSQL server instance. This
# parameter defaults to 5432, the registered and common port of PostgreSQL
# TCP/IP servers.
#
# @keyparam database The name of the database instance to connect with. This
# parameter is optional, if omitted the PostgreSQL server will assume the
# database name is the same as the username.
#
# @keyparam password The user password to connect to the server with. This
# parameter is optional. If omitted, and the database server requests password
# based authentication, the connection will fail. On the other hand, if this
# parameter is provided and the database does not request password
# authentication, then the password will not be used.
#
# @keyparam socket_timeout Socket connect timeout measured in seconds.
# Defaults to 60 seconds.
#
# @keyparam ssl Use SSL encryption for TCP/IP socket. Defaults to False.
class Connection(Cursor):
def __init__(self, dsn="", user=None, host=None, unix_sock=None, port=5432, database=None, password=None, socket_timeout=60, ssl=False):
self._row_desc = None
if dsn:
# update connection parameters parsed of the conninfo dsn
opts = conninfo_parse(dsn)
database = opts.get("dbname", database)
user = opts.get("user", user)
password = opts.get("password", user)
host = opts.get("host", host)
port = int(opts.get("port", port))
ssl = opts.get("sslmode", 'disable') != 'disable'
try:
self.c = protocol.Connection(unix_sock=unix_sock, host=host, port=port, socket_timeout=socket_timeout, ssl=ssl)
self.c.authenticate(user, password=password, database=database)
except socket.error, e:
raise InterfaceError("communication error", e)
Cursor.__init__(self, self)
self._begin = PreparedStatement(self, "BEGIN TRANSACTION")
self._commit = PreparedStatement(self, "COMMIT TRANSACTION")
self._rollback = PreparedStatement(self, "ROLLBACK TRANSACTION")
self._unnamed_prepared_statement_lock = threading.RLock()
self.in_transaction = False
self.autocommit = False
##
# An event handler that is fired when NOTIFY occurs for a notification that
# has been LISTEN'd for. The value of this property is a
# util.MulticastDelegate. A callback can be added by using
# connection.NotificationReceived += SomeMethod. The method will be called
# with a single argument, an object that has properties: backend_pid,
# condition, and additional_info. Callbacks can be removed with the -=
# operator.
# <p>
# Stability: Added in v1.03, stability guaranteed for v1.xx.
NotificationReceived = property(
lambda self: getattr(self.c, "NotificationReceived"),
lambda self, value: setattr(self.c, "NotificationReceived", value)
)
##
# An event handler that is fired when the database server issues a notice.
# The value of this property is a util.MulticastDelegate. A callback can
# be added by using connection.NotificationReceived += SomeMethod. The
# method will be called with a single argument, an object that has
# properties: severity, code, msg, and possibly others (detail, hint,
# position, where, file, line, and routine). Callbacks can be removed with
# the -= operator.
# <p>
# Stability: Added in v1.03, stability guaranteed for v1.xx.
NoticeReceived = property(
lambda self: getattr(self.c, "NoticeReceived"),
lambda self, value: setattr(self.c, "NoticeReceived", value)
)
##
# An event handler that is fired when a runtime configuration option is
# changed on the server. The value of this property is a
# util.MulticastDelegate. A callback can be added by using
# connection.NotificationReceived += SomeMethod. Callbacks can be removed
# with the -= operator. The method will be called with a single argument,
# an object that has properties "key" and "value".
# <p>
# Stability: Added in v1.03, stability guaranteed for v1.xx.
ParameterStatusReceived = property(
lambda self: getattr(self.c, "ParameterStatusReceived"),
lambda self, value: setattr(self.c, "ParameterStatusReceived", value)
)
##
# Begins a new transaction.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
def begin(self):
if self.is_closed:
raise ConnectionClosedError()
if self.autocommit:
return
self._begin.execute()
self.in_transaction = True
##
# Commits the running transaction.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
def commit(self):
if self.is_closed:
raise ConnectionClosedError()
self._commit.execute()
self.in_transaction = False
##
# Rolls back the running transaction.
# <p>
# Stability: Added in v1.00, stability guaranteed for v1.xx.
def rollback(self):
if self.is_closed:
raise ConnectionClosedError()
self._rollback.execute()
self.in_transaction = False
##
# Closes an open connection.
def close(self):
if self.is_closed:
raise ConnectionClosedError()
self.c.close()
self.c = None
is_closed = property(lambda self: self.c == None)
##
# Return the fileno of the underlying socket for this connection.
# <p>
# Stability: Added in v1.07, stability guaranteed for v1.xx.
def fileno(self):
return self.c.fileno()
##
# Poll the underlying socket for this connection and sync if there is data
# waiting to be read. This has the effect of flushing asynchronous
# messages from the backend. Returns True if messages were read, False
# otherwise.
# <p>
# Stability: Added in v1.07, stability guaranteed for v1.xx.
def isready(self):
return self.c.isready()
##
# Return the server_version as reported from the connected server.
# Raises InterfaceError if no version has been reported from the server.
def server_version(self):
return self.c.server_version()
def encoding(self, encoding=None):
"Returns the client_encoding as reported from the connected server"
return self.c.encoding()
File diff suppressed because it is too large Load Diff
+708
View File
@@ -0,0 +1,708 @@
# vim: sw=4:expandtab:foldmethod=marker
#
# Copyright (c) 2007-2009, Mathieu Fenniak
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * The name of the author may not be used to endorse or promote products
# derived from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
__author__ = "Mathieu Fenniak"
import datetime
import decimal
import struct
import math
from errors import (NotSupportedError, ArrayDataParseError, InternalError,
ArrayContentEmptyError, ArrayContentNotHomogenousError,
ArrayContentNotSupportedError, ArrayDimensionsNotConsistentError)
try:
from pytz import utc
except ImportError:
ZERO = datetime.timedelta(0)
class UTC(datetime.tzinfo):
def utcoffset(self, dt):
return ZERO
def tzname(self, dt):
return "UTC"
def dst(self, dt):
return ZERO
utc = UTC()
class Bytea(str):
pass
class Interval(object):
def __init__(self, microseconds=0, days=0, months=0):
self.microseconds = microseconds
self.days = days
self.months = months
def _setMicroseconds(self, value):
if not isinstance(value, int) and not isinstance(value, long):
raise TypeError("microseconds must be an int or long")
elif not (min_int8 < value < max_int8):
raise OverflowError("microseconds must be representable as a 64-bit integer")
else:
self._microseconds = value
def _setDays(self, value):
if not isinstance(value, int) and not isinstance(value, long):
raise TypeError("days must be an int or long")
elif not (min_int4 < value < max_int4):
raise OverflowError("days must be representable as a 32-bit integer")
else:
self._days = value
def _setMonths(self, value):
if not isinstance(value, int) and not isinstance(value, long):
raise TypeError("months must be an int or long")
elif not (min_int4 < value < max_int4):
raise OverflowError("months must be representable as a 32-bit integer")
else:
self._months = value
microseconds = property(lambda self: self._microseconds, _setMicroseconds)
days = property(lambda self: self._days, _setDays)
months = property(lambda self: self._months, _setMonths)
def __repr__(self):
return "<Interval %s months %s days %s microseconds>" % (self.months, self.days, self.microseconds)
def __cmp__(self, other):
if other == None: return -1
c = cmp(self.months, other.months)
if c != 0: return c
c = cmp(self.days, other.days)
if c != 0: return c
return cmp(self.microseconds, other.microseconds)
def pg_type_info(typ):
value = None
if isinstance(typ, dict):
value = typ["value"]
typ = typ["type"]
data = py_types.get(typ)
if data == None:
raise NotSupportedError("type %r not mapped to pg type" % typ)
# permit the type data to be determined by the value, if provided
inspect_func = data.get("inspect")
if value != None and inspect_func != None:
data = inspect_func(value)
type_oid = data.get("typeoid")
if type_oid == None:
raise InternalError("type %r has no type_oid" % typ)
elif type_oid == -1:
# special case: NULL values
return type_oid, 0
# prefer bin, but go with whatever exists
if data.get("bin_out"):
format = 1
elif data.get("txt_out"):
format = 0
else:
raise InternalError("no conversion fuction for type %r" % typ)
return type_oid, format
def pg_value(value, fc, **kwargs):
typ = type(value)
data = py_types.get(typ)
if data == None:
raise NotSupportedError("type %r not mapped to pg type" % typ)
# permit the type conversion to be determined by the value, if provided
inspect_func = data.get("inspect")
if value != None and inspect_func != None:
data = inspect_func(value)
# special case: NULL values
if data.get("typeoid") == -1:
return None
if fc == 0:
func = data.get("txt_out")
elif fc == 1:
func = data.get("bin_out")
else:
raise InternalError("unrecognized format code %r" % fc)
if func == None:
raise NotSupportedError("type %r, format code %r not supported" % (typ, fc))
return func(value, **kwargs)
def py_type_info(description):
type_oid = description['type_oid']
data = pg_types.get(type_oid)
if data == None:
raise NotSupportedError("type oid %r not mapped to py type" % type_oid)
# prefer bin, but go with whatever exists
if data.get("bin_in"):
format = 1
elif data.get("txt_in"):
format = 0
else:
raise InternalError("no conversion fuction for type oid %r" % type_oid)
return format
def py_value(v, description, **kwargs):
if v == None:
# special case - NULL value
return None
type_oid = description['type_oid']
format = description['format']
data = pg_types.get(type_oid)
if data == None:
raise NotSupportedError("type oid %r not supported" % type_oid)
if format == 0:
func = data.get("txt_in")
elif format == 1:
func = data.get("bin_in")
else:
raise NotSupportedError("format code %r not supported" % format)
if func == None:
raise NotSupportedError("data response format %r, type %r not supported" % (format, type_oid))
return func(v, **kwargs)
def boolrecv(data, **kwargs):
return data == "\x01"
def boolsend(v, **kwargs):
if v:
return "\x01"
else:
return "\x00"
min_int2, max_int2 = -2 ** 15, 2 ** 15
min_int4, max_int4 = -2 ** 31, 2 ** 31
min_int8, max_int8 = -2 ** 63, 2 ** 63
def int_inspect(value):
if min_int2 < value < max_int2:
return {"typeoid": 21, "bin_out": int2send}
elif min_int4 < value < max_int4:
return {"typeoid": 23, "bin_out": int4send}
elif min_int8 < value < max_int8:
return {"typeoid": 20, "bin_out": int8send}
else:
return {"typeoid": 1700, "bin_out": numeric_send}
def int2recv(data, **kwargs):
return struct.unpack("!h", data)[0]
def int2send(v, **kwargs):
return struct.pack("!h", v)
def int4recv(data, **kwargs):
return struct.unpack("!i", data)[0]
def int4send(v, **kwargs):
return struct.pack("!i", v)
def int8recv(data, **kwargs):
return struct.unpack("!q", data)[0]
def int8send(v, **kwargs):
return struct.pack("!q", v)
def float4recv(data, **kwargs):
return struct.unpack("!f", data)[0]
def float8recv(data, **kwargs):
return struct.unpack("!d", data)[0]
def float8send(v, **kwargs):
return struct.pack("!d", v)
def datetime_inspect(value):
if value.tzinfo != None:
# send as timestamptz if timezone is provided
return {"typeoid": 1184, "bin_out": timestamptz_send}
else:
# otherwise send as timestamp
return {"typeoid": 1114, "bin_out": timestamp_send}
def timestamp_recv(data, integer_datetimes, **kwargs):
if integer_datetimes:
# data is 64-bit integer representing milliseconds since 2000-01-01
val = struct.unpack("!q", data)[0]
return datetime.datetime(2000, 1, 1) + datetime.timedelta(microseconds = val)
else:
# data is double-precision float representing seconds since 2000-01-01
val = struct.unpack("!d", data)[0]
return datetime.datetime(2000, 1, 1) + datetime.timedelta(seconds = val)
# return a timezone-aware datetime instance if we're reading from a
# "timestamp with timezone" type. The timezone returned will always be UTC,
# but providing that additional information can permit conversion to local.
def timestamptz_recv(data, **kwargs):
return timestamp_recv(data, **kwargs).replace(tzinfo=utc)
def timestamp_send(v, integer_datetimes, **kwargs):
delta = v - datetime.datetime(2000, 1, 1)
val = delta.microseconds + (delta.seconds * 1000000) + (delta.days * 86400000000)
if integer_datetimes:
# data is 64-bit integer representing milliseconds since 2000-01-01
return struct.pack("!q", val)
else:
# data is double-precision float representing seconds since 2000-01-01
return struct.pack("!d", val / 1000.0 / 1000.0)
def timestamptz_send(v, **kwargs):
# timestamps should be sent as UTC. If they have zone info,
# convert them.
return timestamp_send(v.astimezone(utc).replace(tzinfo=None), **kwargs)
def date_in(data, **kwargs):
year = int(data[0:4])
month = int(data[5:7])
day = int(data[8:10])
return datetime.date(year, month, day)
def date_out(v, **kwargs):
return v.isoformat()
def time_in(data, **kwargs):
hour = int(data[0:2])
minute = int(data[3:5])
sec = decimal.Decimal(data[6:])
return datetime.time(hour, minute, int(sec), int((sec - int(sec)) * 1000000))
def time_out(v, **kwargs):
return v.isoformat()
def numeric_in(data, **kwargs):
if data.find(".") == -1:
return int(data)
else:
return decimal.Decimal(data)
def numeric_recv(data, **kwargs):
num_digits, weight, sign, scale = struct.unpack("!hhhh", data[:8])
data = data[8:]
digits = struct.unpack("!" + ("h" * num_digits), data)
weight = decimal.Decimal(weight)
retval = 0
for d in digits:
d = decimal.Decimal(d)
retval += d * (10000 ** weight)
weight -= 1
if sign:
retval *= -1
return retval
DEC_DIGITS = 4
def numeric_send(d, **kwargs):
# This is a very straight port of src/backend/utils/adt/numeric.c set_var_from_str()
s = str(d)
pos = 0
sign = 0
if s[0] == '-':
sign = 0x4000 # NEG
pos=1
elif s[0] == '+':
sign = 0 # POS
pos=1
have_dp = False
decdigits = [0, 0, 0, 0]
dweight = -1
dscale = 0
for char in s[pos:]:
if char.isdigit():
decdigits.append(int(char))
if not have_dp:
dweight += 1
else:
dscale += 1
pos+=1
elif char == '.':
have_dp = True
pos+=1
else:
break
if len(s) > pos:
char = s[pos]
if char == 'e' or char == 'E':
pos+=1
exponent = int(s[pos:])
dweight += exponent
dscale -= exponent
if dscale < 0: dscale = 0
if dweight >= 0:
weight = (dweight + 1 + DEC_DIGITS - 1) / DEC_DIGITS - 1
else:
weight = -((-dweight - 1) / DEC_DIGITS + 1)
offset = (weight + 1) * DEC_DIGITS - (dweight + 1)
ndigits = (len(decdigits)-DEC_DIGITS + offset + DEC_DIGITS - 1) / DEC_DIGITS
i = DEC_DIGITS - offset
decdigits.extend([0, 0, 0])
ndigits_ = ndigits
digits = ''
while ndigits_ > 0:
# ifdef DEC_DIGITS == 4
digits += struct.pack("!h", ((decdigits[i] * 10 + decdigits[i + 1]) * 10 + decdigits[i + 2]) * 10 + decdigits[i + 3])
ndigits_ -= 1
i += DEC_DIGITS
# strip_var()
if ndigits == 0:
sign = 0x4000 # pos
weight = 0
# ----------
retval = struct.pack("!hhhh", ndigits, weight, sign, dscale) + digits
return retval
def numeric_out(v, **kwargs):
return str(v)
# PostgreSQL encodings:
# http://www.postgresql.org/docs/8.3/interactive/multibyte.html
# Python encodings:
# http://www.python.org/doc/2.4/lib/standard-encodings.html
#
# Commented out encodings don't require a name change between PostgreSQL and
# Python. If the py side is None, then the encoding isn't supported.
pg_to_py_encodings = {
# Not supported:
"mule_internal": None,
"euc_tw": None,
# Name fine as-is:
#"euc_jp",
#"euc_jis_2004",
#"euc_kr",
#"gb18030",
#"gbk",
#"johab",
#"sjis",
#"shift_jis_2004",
#"uhc",
#"utf8",
# Different name:
"euc_cn": "gb2312",
"iso_8859_5": "is8859_5",
"iso_8859_6": "is8859_6",
"iso_8859_7": "is8859_7",
"iso_8859_8": "is8859_8",
"koi8": "koi8_r",
"latin1": "iso8859-1",
"latin2": "iso8859_2",
"latin3": "iso8859_3",
"latin4": "iso8859_4",
"latin5": "iso8859_9",
"latin6": "iso8859_10",
"latin7": "iso8859_13",
"latin8": "iso8859_14",
"latin9": "iso8859_15",
"sql_ascii": "ascii",
"win866": "cp886",
"win874": "cp874",
"win1250": "cp1250",
"win1251": "cp1251",
"win1252": "cp1252",
"win1253": "cp1253",
"win1254": "cp1254",
"win1255": "cp1255",
"win1256": "cp1256",
"win1257": "cp1257",
"win1258": "cp1258",
}
def encoding_convert(encoding):
return pg_to_py_encodings.get(encoding.lower(), encoding)
def varcharin(data, client_encoding, **kwargs):
return unicode(data, encoding_convert(client_encoding))
def textout(v, client_encoding, **kwargs):
if isinstance(v, unicode):
return v.encode(encoding_convert(client_encoding))
else:
return v
def byteasend(v, **kwargs):
return str(v)
def bytearecv(data, **kwargs):
return Bytea(data)
# interval support does not provide a Python-usable interval object yet
def interval_recv(data, integer_datetimes, **kwargs):
if integer_datetimes:
microseconds, days, months = struct.unpack("!qii", data)
else:
seconds, days, months = struct.unpack("!dii", data)
microseconds = int(seconds * 1000 * 1000)
return Interval(microseconds, days, months)
def interval_send(data, integer_datetimes, **kwargs):
if integer_datetimes:
return struct.pack("!qii", data.microseconds, data.days, data.months)
else:
return struct.pack("!dii", data.microseconds / 1000.0 / 1000.0, data.days, data.months)
def array_recv(data, **kwargs):
dim, hasnull, typeoid = struct.unpack("!iii", data[:12])
data = data[12:]
# get type conversion method for typeoid
conversion = pg_types[typeoid]["bin_in"]
# Read dimension info
dim_lengths = []
element_count = 1
for idim in range(dim):
dim_len, dim_lbound = struct.unpack("!ii", data[:8])
data = data[8:]
dim_lengths.append(dim_len)
element_count *= dim_len
# Read all array values
array_values = []
for i in range(element_count):
if len(data):
element_len, = struct.unpack("!i", data[:4])
data = data[4:]
if element_len == -1:
array_values.append(None)
else:
array_values.append(conversion(data[:element_len], **kwargs))
data = data[element_len:]
if data != "":
raise ArrayDataParseError("unexpected data left over after array read")
# at this point, {{1,2,3},{4,5,6}}::int[][] looks like [1,2,3,4,5,6].
# go through the dimensions and fix up the array contents to match
# expected dimensions
for dim_length in reversed(dim_lengths[1:]):
val = []
while array_values:
val.append(array_values[:dim_length])
array_values = array_values[dim_length:]
array_values = val
return array_values
def array_inspect(value):
# Check if array has any values. If not, we can't determine the proper
# array typeoid.
first_element = array_find_first_element(value)
if first_element == None:
raise ArrayContentEmptyError("array has no values")
# supported array output
typ = type(first_element)
if issubclass(typ, int) or issubclass(typ, long):
# special int array support -- send as smallest possible array type
special_int_support = True
int2_ok, int4_ok, int8_ok = True, True, True
for v in array_flatten(value):
if v == None:
continue
if min_int2 < v < max_int2:
continue
int2_ok = False
if min_int4 < v < max_int4:
continue
int4_ok = False
if min_int8 < v < max_int8:
continue
int8_ok = False
if int2_ok:
array_typeoid = 1005 # INT2[]
elif int4_ok:
array_typeoid = 1007 # INT4[]
elif int8_ok:
array_typeoid = 1016 # INT8[]
else:
raise ArrayContentNotSupportedError("numeric not supported as array contents")
else:
special_int_support = False
array_typeoid = py_array_types.get(typ)
if array_typeoid == None:
raise ArrayContentNotSupportedError("type %r not supported as array contents" % typ)
# check for homogenous array
for v in array_flatten(value):
if v != None and not (isinstance(v, typ) or (typ == long and isinstance(v, int)) or (typ == int and isinstance(v, long))):
raise ArrayContentNotHomogenousError("not all array elements are of type %r" % typ)
# check that all array dimensions are consistent
array_check_dimensions(value)
type_data = py_types[typ]
if special_int_support:
if array_typeoid == 1005:
type_data = {"typeoid": 21, "bin_out": int2send}
elif array_typeoid == 1007:
type_data = {"typeoid": 23, "bin_out": int4send}
elif array_typeoid == 1016:
type_data = {"typeoid": 20, "bin_out": int8send}
else:
type_data = py_types[typ]
return {
"typeoid": array_typeoid,
"bin_out": array_send(type_data["typeoid"], type_data["bin_out"])
}
def array_find_first_element(arr):
for v in array_flatten(arr):
if v != None:
return v
return None
def array_flatten(arr):
for v in arr:
if isinstance(v, list):
for v2 in array_flatten(v):
yield v2
else:
yield v
def array_check_dimensions(arr):
v0 = arr[0]
if isinstance(v0, list):
req_len = len(v0)
req_inner_lengths = array_check_dimensions(v0)
for v in arr:
inner_lengths = array_check_dimensions(v)
if len(v) != req_len or inner_lengths != req_inner_lengths:
raise ArrayDimensionsNotConsistentError("array dimensions not consistent")
retval = [req_len]
retval.extend(req_inner_lengths)
return retval
else:
# make sure nothing else at this level is a list
for v in arr:
if isinstance(v, list):
raise ArrayDimensionsNotConsistentError("array dimensions not consistent")
return []
def array_has_null(arr):
for v in array_flatten(arr):
if v == None:
return True
return False
def array_dim_lengths(arr):
v0 = arr[0]
if isinstance(v0, list):
retval = [len(v0)]
retval.extend(array_dim_lengths(v0))
else:
return [len(arr)]
return retval
class array_send(object):
def __init__(self, typeoid, bin_out_func):
self.typeoid = typeoid
self.bin_out_func = bin_out_func
def __call__(self, arr, **kwargs):
has_null = array_has_null(arr)
dim_lengths = array_dim_lengths(arr)
data = struct.pack("!iii", len(dim_lengths), has_null, self.typeoid)
for i in dim_lengths:
data += struct.pack("!ii", i, 1)
for v in array_flatten(arr):
if v == None:
data += struct.pack("!i", -1)
else:
inner_data = self.bin_out_func(v, **kwargs)
data += struct.pack("!i", len(inner_data))
data += inner_data
return data
py_types = {
bool: {"typeoid": 16, "bin_out": boolsend},
int: {"inspect": int_inspect},
long: {"inspect": int_inspect},
str: {"typeoid": 25, "bin_out": textout},
unicode: {"typeoid": 25, "bin_out": textout},
float: {"typeoid": 701, "bin_out": float8send},
decimal.Decimal: {"typeoid": 1700, "bin_out": numeric_send},
Bytea: {"typeoid": 17, "bin_out": byteasend},
datetime.datetime: {"typeoid": 1114, "bin_out": timestamp_send, "inspect": datetime_inspect},
datetime.date: {"typeoid": 1082, "txt_out": date_out},
datetime.time: {"typeoid": 1083, "txt_out": time_out},
Interval: {"typeoid": 1186, "bin_out": interval_send},
type(None): {"typeoid": -1},
list: {"inspect": array_inspect},
}
# py type -> pg array typeoid
py_array_types = {
float: 1022,
bool: 1000,
str: 1009, # TEXT[]
unicode: 1009, # TEXT[]
decimal.Decimal: 1231, # NUMERIC[]
}
pg_types = {
16: {"bin_in": boolrecv},
17: {"bin_in": bytearecv},
19: {"bin_in": varcharin}, # name type
20: {"bin_in": int8recv},
21: {"bin_in": int2recv},
23: {"bin_in": int4recv, "txt_in": numeric_in},
25: {"bin_in": varcharin, "txt_in": varcharin}, # TEXT type
26: {"txt_in": numeric_in}, # oid type
142: {"bin_in": varcharin, "txt_in": varcharin}, # XML
194: {"bin_in": varcharin}, # "string representing an internal node tree"
700: {"bin_in": float4recv},
701: {"bin_in": float8recv},
705: {"txt_in": varcharin}, # UNKNOWN
829: {"txt_in": varcharin}, # MACADDR type
1000: {"bin_in": array_recv}, # BOOL[]
1003: {"bin_in": array_recv}, # NAME[]
1005: {"bin_in": array_recv}, # INT2[]
1007: {"bin_in": array_recv, "txt_in": varcharin}, # INT4[]
1009: {"bin_in": array_recv}, # TEXT[]
1014: {"bin_in": array_recv}, # CHAR[]
1015: {"bin_in": array_recv}, # VARCHAR[]
1016: {"bin_in": array_recv}, # INT8[]
1021: {"bin_in": array_recv}, # FLOAT4[]
1022: {"bin_in": array_recv}, # FLOAT8[]
1042: {"bin_in": varcharin}, # CHAR type
1043: {"bin_in": varcharin}, # VARCHAR type
1082: {"txt_in": date_in},
1083: {"txt_in": time_in},
1114: {"bin_in": timestamp_recv},
1184: {"bin_in": timestamptz_recv}, # timestamp w/ tz
1186: {"bin_in": interval_recv},
1231: {"bin_in": array_recv}, # NUMERIC[]
1263: {"bin_in": array_recv}, # cstring[]
1700: {"bin_in": numeric_recv},
2275: {"bin_in": varcharin}, # cstring
}
+20
View File
@@ -0,0 +1,20 @@
class MulticastDelegate(object):
def __init__(self):
self.delegates = []
def __iadd__(self, delegate):
self.add(delegate)
return self
def add(self, delegate):
self.delegates.append(delegate)
def __isub__(self, delegate):
self.delegates.remove(delegate)
return self
def __call__(self, *args, **kwargs):
for d in self.delegates:
d(*args, **kwargs)