+ # Stability: Part of the DBAPI 2.0 specification.
+ def execute(self, operation, args=None, stream=None):
+ """Executes a database operation. Parameters may be provided as a
+ sequence, or as a mapping, depending upon the value of
+ :data:`pg8000.paramstyle`.
+
+ This method is part of the `DBAPI 2.0 specification
+
+ # Stability: Added in v1.03, stability guaranteed for v1.xx.
+ self.NoticeReceived = MulticastDelegate()
+
+ ##
+ # An event handler that is fired when a runtime configuration option is
+ # changed on the server. The value of this property is a
+ # MulticastDelegate. A callback can be added by using
+ # connection.NotificationReceived += SomeMethod. Callbacks can be
+ # removed with the -= operator. The method will be called with a single
+ # argument, an object that has properties "key" and "value".
+ #
+ # Stability: Added in v1.03, stability guaranteed for v1.xx.
+ self.ParameterStatusReceived = MulticastDelegate()
+
+ ##
+ # An event handler that is fired when NOTIFY occurs for a notification
+ # that has been LISTEN'd for. The value of this property is a
+ # MulticastDelegate. A callback can be added by using
+ # connection.NotificationReceived += SomeMethod. The method will be
+ # called with a single argument, an object that has properties:
+ # backend_pid, condition, and additional_info. Callbacks can be
+ # removed with the -= operator.
+ #
+ # Stability: Added in v1.03, stability guaranteed for v1.xx.
+ self.NotificationReceived = MulticastDelegate()
+
+ self.ParameterStatusReceived += self.handle_PARAMETER_STATUS
+
+ def text_out(v):
+ return v.encode(self._client_encoding)
+
+ def time_out(v):
+ return v.isoformat().encode(self._client_encoding)
+
+ def date_out(v):
+ if v == datetime.date.max:
+ return 'infinity'.encode(self._client_encoding)
+ elif v == datetime.date.min:
+ return '-infinity'.encode(self._client_encoding)
+ else:
+ return v.isoformat().encode(self._client_encoding)
+
+ def unknown_out(v):
+ return str(v).encode(self._client_encoding)
+
+ trans_tab = dict(zip(map(ord, u('{}')), u('[]')))
+ glbls = {'Decimal': Decimal}
+
+ def array_in(data, idx, length):
+ arr = []
+ prev_c = None
+ for c in data[idx:idx+length].decode(
+ self._client_encoding).translate(
+ trans_tab).replace(u('NULL'), u('None')):
+ if c not in ('[', ']', ',', 'N') and prev_c in ('[', ','):
+ arr.extend("Decimal('")
+ elif c in (']', ',') and prev_c not in ('[', ']', ',', 'e'):
+ arr.extend("')")
+
+ arr.append(c)
+ prev_c = c
+ return eval(''.join(arr), glbls)
+
+ def array_recv(data, idx, length):
+ final_idx = idx + length
+ dim, hasnull, typeoid = iii_unpack(data, idx)
+ idx += 12
+
+ # get type conversion method for typeoid
+ conversion = self.pg_types[typeoid][1]
+
+ # Read dimension info
+ dim_lengths = []
+ for i in range(dim):
+ dim_lengths.append(ii_unpack(data, idx)[0])
+ idx += 8
+
+ # Read all array values
+ values = []
+ while idx < final_idx:
+ element_len, = i_unpack(data, idx)
+ idx += 4
+ if element_len == -1:
+ values.append(None)
+ else:
+ values.append(conversion(data, idx, element_len))
+ idx += element_len
+
+ # at this point, {{1,2,3},{4,5,6}}::int[][] looks like
+ # [1,2,3,4,5,6]. go through the dimensions and fix up the array
+ # contents to match expected dimensions
+ for length in reversed(dim_lengths[1:]):
+ values = list(map(list, zip(*[iter(values)] * length)))
+ return values
+
+ def vector_in(data, idx, length):
+ return eval('[' + data[idx:idx+length].decode(
+ self._client_encoding).replace(' ', ',') + ']')
+
+ if PY2:
+ def text_recv(data, offset, length):
+ return unicode( # noqa
+ data[offset: offset + length], self._client_encoding)
+
+ def bool_recv(d, o, l):
+ return d[o] == "\x01"
+
+ else:
+ def text_recv(data, offset, length):
+ return str(
+ data[offset: offset + length], self._client_encoding)
+
+ def bool_recv(data, offset, length):
+ return data[offset] == 1
+
+ def time_in(data, offset, length):
+ hour = int(data[offset:offset + 2])
+ minute = int(data[offset + 3:offset + 5])
+ sec = Decimal(
+ data[offset + 6:offset + length].decode(self._client_encoding))
+ return datetime.time(
+ hour, minute, int(sec), int((sec - int(sec)) * 1000000))
+
+ def date_in(data, offset, length):
+ year_str = data[offset:offset + 4].decode(self._client_encoding)
+ if year_str == 'infi':
+ return datetime.date.max
+ elif year_str == '-inf':
+ return datetime.date.min
+ else:
+ return datetime.date(
+ int(year_str), int(data[offset + 5:offset + 7]),
+ int(data[offset + 8:offset + 10]))
+
+ def numeric_in(data, offset, length):
+ return Decimal(
+ data[offset: offset + length].decode(self._client_encoding))
+
+ def numeric_out(d):
+ return str(d).encode(self._client_encoding)
+
+ self.pg_types = defaultdict(
+ lambda: (FC_TEXT, text_recv), {
+ 16: (FC_BINARY, bool_recv), # boolean
+ 17: (FC_BINARY, bytea_recv), # bytea
+ 19: (FC_BINARY, text_recv), # name type
+ 20: (FC_BINARY, int8_recv), # int8
+ 21: (FC_BINARY, int2_recv), # int2
+ 22: (FC_TEXT, vector_in), # int2vector
+ 23: (FC_BINARY, int4_recv), # int4
+ 25: (FC_BINARY, text_recv), # TEXT type
+ 26: (FC_TEXT, int_in), # oid
+ 28: (FC_TEXT, int_in), # xid
+ 700: (FC_BINARY, float4_recv), # float4
+ 701: (FC_BINARY, float8_recv), # float8
+ 705: (FC_BINARY, text_recv), # unknown
+ 829: (FC_TEXT, text_recv), # MACADDR type
+ 1000: (FC_BINARY, array_recv), # BOOL[]
+ 1003: (FC_BINARY, array_recv), # NAME[]
+ 1005: (FC_BINARY, array_recv), # INT2[]
+ 1007: (FC_BINARY, array_recv), # INT4[]
+ 1009: (FC_BINARY, array_recv), # TEXT[]
+ 1014: (FC_BINARY, array_recv), # CHAR[]
+ 1015: (FC_BINARY, array_recv), # VARCHAR[]
+ 1016: (FC_BINARY, array_recv), # INT8[]
+ 1021: (FC_BINARY, array_recv), # FLOAT4[]
+ 1022: (FC_BINARY, array_recv), # FLOAT8[]
+ 1042: (FC_BINARY, text_recv), # CHAR type
+ 1043: (FC_BINARY, text_recv), # VARCHAR type
+ 1082: (FC_TEXT, date_in), # date
+ 1083: (FC_TEXT, time_in),
+ 1114: (FC_BINARY, timestamp_recv_float), # timestamp w/ tz
+ 1184: (FC_BINARY, timestamptz_recv_float),
+ 1186: (FC_BINARY, interval_recv_integer),
+ 1231: (FC_TEXT, array_in), # NUMERIC[]
+ 1263: (FC_BINARY, array_recv), # cstring[]
+ 1700: (FC_TEXT, numeric_in), # NUMERIC
+ 2275: (FC_BINARY, text_recv), # cstring
+ 2950: (FC_BINARY, uuid_recv), # uuid
+ })
+
+ self.py_types = {
+ type(None): (-1, FC_BINARY, null_send), # null
+ bool: (16, FC_BINARY, bool_send),
+ int: (705, FC_TEXT, unknown_out),
+ float: (701, FC_BINARY, d_pack), # float8
+ str: (705, FC_TEXT, text_out), # unknown
+ datetime.date: (1082, FC_TEXT, date_out), # date
+ datetime.time: (1083, FC_TEXT, time_out), # time
+ 1114: (1114, FC_BINARY, timestamp_send_integer), # timestamp
+ # timestamp w/ tz
+ 1184: (1184, FC_BINARY, timestamptz_send_integer),
+ datetime.timedelta: (1186, FC_BINARY, interval_send_integer),
+ Interval: (1186, FC_BINARY, interval_send_integer),
+ Decimal: (1700, FC_TEXT, numeric_out), # Decimal
+ UUID: (2950, FC_BINARY, uuid_send), # uuid
+ }
+
+ self.inspect_funcs = {
+ datetime.datetime: self.inspect_datetime,
+ list: self.array_inspect,
+ tuple: self.array_inspect,
+ }
+
+ if PY2:
+ self.py_types[pg8000.Bytea] = (17, FC_BINARY, bytea_send) # bytea
+ self.py_types[text_type] = (705, FC_TEXT, text_out) # unknown
+
+ self.py_types[long] = (705, FC_TEXT, unknown_out) # noqa
+ else:
+ self.py_types[bytes] = (17, FC_BINARY, bytea_send) # bytea
+
+ try:
+ from ipaddress import (
+ ip_address, IPv4Address, IPv6Address, ip_network, IPv4Network,
+ IPv6Network)
+
+ def inet_out(v):
+ return str(v).encode(self._client_encoding)
+
+ def inet_in(data, offset, length):
+ inet_str = data[offset: offset + length].decode(
+ self._client_encoding)
+ if '/' in inet_str:
+ return ip_network(inet_str, False)
+ else:
+ return ip_address(inet_str)
+
+ self.py_types[IPv4Address] = (869, FC_TEXT, inet_out) # inet
+ self.py_types[IPv6Address] = (869, FC_TEXT, inet_out) # inet
+ self.py_types[IPv4Network] = (869, FC_TEXT, inet_out) # inet
+ self.py_types[IPv6Network] = (869, FC_TEXT, inet_out) # inet
+ self.pg_types[869] = (FC_TEXT, inet_in) # inet
+ except ImportError:
+ pass
+
+ self.message_types = {
+ NOTICE_RESPONSE: self.handle_NOTICE_RESPONSE,
+ AUTHENTICATION_REQUEST: self.handle_AUTHENTICATION_REQUEST,
+ PARAMETER_STATUS: self.handle_PARAMETER_STATUS,
+ BACKEND_KEY_DATA: self.handle_BACKEND_KEY_DATA,
+ READY_FOR_QUERY: self.handle_READY_FOR_QUERY,
+ ROW_DESCRIPTION: self.handle_ROW_DESCRIPTION,
+ ERROR_RESPONSE: self.handle_ERROR_RESPONSE,
+ DATA_ROW: self.handle_DATA_ROW,
+ COMMAND_COMPLETE: self.handle_COMMAND_COMPLETE,
+ PARSE_COMPLETE: self.handle_PARSE_COMPLETE,
+ BIND_COMPLETE: self.handle_BIND_COMPLETE,
+ CLOSE_COMPLETE: self.handle_CLOSE_COMPLETE,
+ PORTAL_SUSPENDED: self.handle_PORTAL_SUSPENDED,
+ NO_DATA: self.handle_NO_DATA,
+ PARAMETER_DESCRIPTION: self.handle_PARAMETER_DESCRIPTION,
+ NOTIFICATION_RESPONSE: self.handle_NOTIFICATION_RESPONSE,
+ COPY_DONE: self.handle_COPY_DONE,
+ COPY_DATA: self.handle_COPY_DATA,
+ COPY_IN_RESPONSE: self.handle_COPY_IN_RESPONSE,
+ COPY_OUT_RESPONSE: self.handle_COPY_OUT_RESPONSE}
+
+ # Int32 - Message length, including self.
+ # Int32(196608) - Protocol version number. Version 3.0.
+ # Any number of key/value pairs, terminated by a zero byte:
+ # String - A parameter name (user, database, or options)
+ # String - Parameter value
+ protocol = 196608
+ val = bytearray(i_pack(protocol) + b("user\x00"))
+ val.extend(user.encode("ascii") + NULL_BYTE)
+ if database is not None:
+ val.extend(
+ b("database\x00") + database.encode("ascii") + NULL_BYTE)
+ val.append(0)
+ self._write(i_pack(len(val) + 4))
+ self._write(val)
+ self._flush()
+
+ self._cursor = self.cursor()
+ try:
+ self._lock.acquire()
+ code = self.error = None
+ while code not in (READY_FOR_QUERY, ERROR_RESPONSE):
+ code, data_len = ci_unpack(self._read(5))
+ self.message_types[code](self._read(data_len - 4), None)
+ if self.error is not None:
+ raise self.error
+ except:
+ self._close()
+ raise
+ finally:
+ self._lock.release()
+
+ self.in_transaction = False
+ self.notifies = []
+ self.notifies_lock = threading.Lock()
+
+ def handle_ERROR_RESPONSE(self, data, ps):
+ msg_dict = data_into_dict(data)
+ if msg_dict[RESPONSE_CODE] == "28000":
+ self.error = InterfaceError("md5 password authentication failed")
+ else:
+ self.error = ProgrammingError(
+ msg_dict[RESPONSE_SEVERITY], msg_dict[RESPONSE_CODE],
+ msg_dict[RESPONSE_MSG])
+
+ def handle_CLOSE_COMPLETE(self, data, ps):
+ pass
+
+ def handle_PARSE_COMPLETE(self, data, ps):
+ # Byte1('1') - Identifier.
+ # Int32(4) - Message length, including self.
+ pass
+
+ def handle_BIND_COMPLETE(self, data, ps):
+ pass
+
+ def handle_PORTAL_SUSPENDED(self, data, cursor):
+ cursor.portal_suspended = True
+
+ def handle_PARAMETER_DESCRIPTION(self, data, ps):
+ # Well, we don't really care -- we're going to send whatever we
+ # want and let the database deal with it. But thanks anyways!
+
+ # count = h_unpack(data)[0]
+ # type_oids = unpack_from("!" + "i" * count, data, 2)
+ pass
+
+ def handle_COPY_DONE(self, data, ps):
+ self._copy_done = True
+
+ def handle_COPY_OUT_RESPONSE(self, data, ps):
+ # Int8(1) - 0 textual, 1 binary
+ # Int16(2) - Number of columns
+ # Int16(N) - Format codes for each column (0 text, 1 binary)
+
+ is_binary, num_cols = bh_unpack(data)
+ # column_formats = unpack_from('!' + 'h' * num_cols, data, 3)
+ if ps.stream is None:
+ raise InterfaceError(
+ "An output stream is required for the COPY OUT response.")
+
+ def handle_COPY_DATA(self, data, ps):
+ ps.stream.write(data)
+
+ def handle_COPY_IN_RESPONSE(self, data, ps):
+ # Int16(2) - Number of columns
+ # Int16(N) - Format codes for each column (0 text, 1 binary)
+ is_binary, num_cols = bh_unpack(data)
+ # column_formats = unpack_from('!' + 'h' * num_cols, data, 3)
+ assert self._lock.locked()
+ if ps.stream is None:
+ raise InterfaceError(
+ "An input stream is required for the COPY IN response.")
+
+ if PY2:
+ while True:
+ data = ps.stream.read(8192)
+ if not data:
+ break
+ self._write(COPY_DATA + i_pack(len(data) + 4))
+ self._write(data)
+ self._flush()
+ else:
+ bffr = bytearray(8192)
+ while True:
+ bytes_read = ps.stream.readinto(bffr)
+ if bytes_read == 0:
+ break
+ self._write(COPY_DATA + i_pack(bytes_read + 4))
+ self._write(bffr[:bytes_read])
+ self._flush()
+
+ # Send CopyDone
+ # Byte1('c') - Identifier.
+ # Int32(4) - Message length, including self.
+ self._write(COPY_DONE_MSG)
+ self._write(SYNC_MSG)
+ self._flush()
+
+ def handle_NOTIFICATION_RESPONSE(self, data, ps):
+ self.NotificationReceived(data)
+ ##
+ # A message sent if this connection receives a NOTIFY that it was
+ # LISTENing for.
+ #
+ # Stability: Added in pg8000 v1.03. When limited to accessing
+ # properties from a notification event dispatch, stability is
+ # guaranteed for v1.xx.
+ backend_pid = i_unpack(data)[0]
+ idx = 4
+ null = data.find(NULL_BYTE, idx) - idx
+ condition = data[idx:idx + null].decode("ascii")
+ idx += null + 1
+ null = data.find(NULL_BYTE, idx) - idx
+ # additional_info = data[idx:idx + null]
+
+ # psycopg2 compatible notification interface
+ try:
+ self.notifies_lock.acquire()
+ self.notifies.append((backend_pid, condition))
+ finally:
+ self.notifies_lock.release()
+
+ def cursor(self):
+ """Creates a :class:`Cursor` object bound to this
+ connection.
+
+ This function is part of the `DBAPI 2.0 specification
+
-# Unlike the DBAPI specification, this value is not constant. It can be
-# changed to any standard paramstyle value (ie. qmark, numeric, named, format,
-# and pyformat).
-paramstyle = 'format' # paramstyle can be changed to any DB-API paramstyle
-
-def convert_paramstyle(src_style, query, args):
- # I don't see any way to avoid scanning the query string char by char,
- # so we might as well take that careful approach and create a
- # state-based scanner. We'll use int variables for the state.
- # 0 -- outside quoted string
- # 1 -- inside single-quote string '...'
- # 2 -- inside quoted identifier "..."
- # 3 -- inside escaped single-quote string, E'...'
- state = 0
- output_query = ""
- output_args = []
- if src_style == "numeric":
- output_args = args
- elif src_style in ("pyformat", "named"):
- mapping_to_idx = {}
- i = 0
- while 1:
- if i == len(query):
- break
- c = query[i]
- # print "begin loop", repr(i), repr(c), repr(state)
- if state == 0:
- if c == "'":
- i += 1
- output_query += c
- state = 1
- elif c == '"':
- i += 1
- output_query += c
- state = 2
- elif c == 'E':
- # check for escaped single-quote string
- i += 1
- if i < len(query) and i > 1 and query[i] == "'":
- i += 1
- output_query += "E'"
- state = 3
- else:
- output_query += c
- elif src_style == "qmark" and c == "?":
- i += 1
- param_idx = len(output_args)
- if param_idx == len(args):
- raise QueryParameterIndexError("too many parameter fields, not enough parameters")
- output_args.append(args[param_idx])
- output_query += "$" + str(param_idx + 1)
- elif src_style == "numeric" and c == ":":
- i += 1
- if i < len(query) and i > 1 and query[i].isdigit():
- output_query += "$" + query[i]
- i += 1
- else:
- raise QueryParameterParseError("numeric parameter : does not have numeric arg")
- elif src_style == "named" and c == ":":
- name = ""
- while 1:
- i += 1
- if i == len(query):
- break
- c = query[i]
- if c.isalnum() or c == '_':
- name += c
- else:
- break
- if name == "":
- raise QueryParameterParseError("empty name of named parameter")
- idx = mapping_to_idx.get(name)
- if idx == None:
- idx = len(output_args)
- output_args.append(args[name])
- idx += 1
- mapping_to_idx[name] = idx
- output_query += "$" + str(idx)
- elif src_style == "format" and c == "%":
- i += 1
- if i < len(query) and i > 1:
- if query[i] == "s":
- param_idx = len(output_args)
- if param_idx == len(args):
- raise QueryParameterIndexError("too many parameter fields, not enough parameters")
- output_args.append(args[param_idx])
- output_query += "$" + str(param_idx + 1)
- elif query[i] == "%":
- output_query += "%"
- else:
- raise QueryParameterParseError("Only %s and %% are supported")
- i += 1
- else:
- raise QueryParameterParseError("format parameter % does not have format code")
- elif src_style == "pyformat" and c == "%":
- i += 1
- if i < len(query) and i > 1:
- if query[i] == "(":
- i += 1
- # begin mapping name
- end_idx = query.find(')', i)
- if end_idx == -1:
- raise QueryParameterParseError("began pyformat dict read, but couldn't find end of name")
- else:
- name = query[i:end_idx]
- i = end_idx + 1
- if i < len(query) and query[i] == "s":
- i += 1
- idx = mapping_to_idx.get(name)
- if idx == None:
- idx = len(output_args)
- output_args.append(args[name])
- idx += 1
- mapping_to_idx[name] = idx
- output_query += "$" + str(idx)
- else:
- raise QueryParameterParseError("format not specified or not supported (only %(...)s supported)")
- elif query[i] == "%":
- output_query += "%"
- elif query[i] == "s":
- # we have a %s in a pyformat query string. Assume
- # support for format instead.
- i -= 1
- src_style = "format"
- else:
- raise QueryParameterParseError("Only %(name)s, %s and %% are supported")
- else:
- i += 1
- output_query += c
- elif state == 1:
- output_query += c
- i += 1
- if c == "'":
- # Could be a double ''
- if i < len(query) and query[i] == "'":
- # is a double quote.
- output_query += query[i]
- i += 1
- else:
- state = 0
- elif src_style in ("pyformat","format") and c == "%":
- # hm... we're only going to support an escaped percent sign
- if i < len(query):
- if query[i] == "%":
- # good. We already output the first percent sign.
- i += 1
- else:
- raise QueryParameterParseError("'%" + query[i] + "' not supported in quoted string")
- elif state == 2:
- output_query += c
- i += 1
- if c == '"':
- state = 0
- elif src_style in ("pyformat","format") and c == "%":
- # hm... we're only going to support an escaped percent sign
- if i < len(query):
- if query[i] == "%":
- # good. We already output the first percent sign.
- i += 1
- else:
- raise QueryParameterParseError("'%" + query[i] + "' not supported in quoted string")
- elif state == 3:
- output_query += c
- i += 1
- if c == "\\":
- # check for escaped single-quote
- if i < len(query) and query[i] == "'":
- output_query += "'"
- i += 1
- elif c == "'":
- state = 0
- elif src_style in ("pyformat","format") and c == "%":
- # hm... we're only going to support an escaped percent sign
- if i < len(query):
- if query[i] == "%":
- # good. We already output the first percent sign.
- i += 1
- else:
- raise QueryParameterParseError("'%" + query[i] + "' not supported in quoted string")
-
- return output_query, tuple(output_args)
-
-
-def require_open_cursor(fn):
- def _fn(self, *args, **kwargs):
- if self.cursor == None:
- raise CursorClosedError()
- return fn(self, *args, **kwargs)
- return _fn
-
-##
-# The class of object returned by the {@link #ConnectionWrapper.cursor cursor method}.
-class CursorWrapper(object):
- def __init__(self, conn, connection):
- self.cursor = interface.Cursor(conn)
- self.arraysize = 1
- self._connection = connection
- self._override_rowcount = None
-
- ##
- # This read-only attribute returns a reference to the connection object on
- # which the cursor was created.
- #
- # Stability: Part of a DBAPI 2.0 extension. A warning "DB-API extension
- # cursor.connection used" will be fired.
- connection = property(lambda self: self._getConnection())
-
- def _getConnection(self):
- warn("DB-API extension cursor.connection used", stacklevel=3)
- return self._connection
-
- ##
- # This read-only attribute specifies the number of rows that the last
- # .execute*() produced (for DQL statements like 'select') or affected (for
- # DML statements like 'update' or 'insert').
- #
- # The attribute is -1 in case no .execute*() has been performed on the
- # cursor or the rowcount of the last operation is cannot be determined by
- # the interface.
- #
- # Stability: Part of the DBAPI 2.0 specification.
- rowcount = property(lambda self: self._getRowCount())
-
- @require_open_cursor
- def _getRowCount(self):
- if self._override_rowcount != None:
- return self._override_rowcount
- return self.cursor.row_count
-
- ##
- # This read-only attribute is a sequence of 7-item sequences. Each value
- # contains information describing one result column. The 7 items returned
- # for each column are (name, type_code, display_size, internal_size,
- # precision, scale, null_ok). Only the first two values are provided by
- # this interface implementation.
- #
- # Stability: Part of the DBAPI 2.0 specification.
- description = property(lambda self: self._getDescription())
-
- @require_open_cursor
- def _getDescription(self):
- if self.cursor.row_description == None:
- return None
- columns = []
- for col in self.cursor.row_description:
- columns.append((col["name"], col["type_oid"], None, None, None, None, None))
- return columns
-
- ##
- # Executes a database operation. Parameters may be provided as a sequence
- # or mapping and will be bound to variables in the operation.
- #
- # Stability: Part of the DBAPI 2.0 specification.
- @require_open_cursor
- def execute(self, operation, args=()):
- if not self._connection.in_transaction:
- self._connection.begin()
- self._override_rowcount = None
- self._execute(operation, args)
-
- def _execute(self, operation, args=()):
- new_query, new_args = convert_paramstyle(paramstyle, operation, args)
- try:
- self.cursor.execute(new_query, *new_args)
- except ConnectionClosedError:
- # can't rollback in this case
- raise
- except:
- # any error will rollback the transaction to-date
- self._connection.rollback()
- raise
-
- def copy_from(self, fileobj, table=None, sep='\t', null=None, query=None):
- if query == None:
- if table == None:
- raise CopyQueryOrTableRequiredError()
- query = "COPY %s FROM stdout DELIMITER '%s'" % (table, sep)
- if null is not None:
- query += " NULL '%s'" % (null,)
- self.copy_execute(fileobj, query)
-
- def copy_to(self, fileobj, table=None, sep='\t', null=None, query=None):
- if query == None:
- if table == None:
- raise CopyQueryOrTableRequiredError()
- query = "COPY %s TO stdout DELIMITER '%s'" % (table, sep)
- if null is not None:
- query += " NULL '%s'" % (null,)
- self.copy_execute(fileobj, query)
-
- @require_open_cursor
- def copy_execute(self, fileobj, query):
- try:
- self.cursor.execute(query, stream=fileobj)
- except ConnectionClosedError:
- # can't rollback in this case
- raise
- except:
- # any error will rollback the transaction to-date
- import traceback; traceback.print_exc()
- self._connection.rollback()
- raise
-
- ##
- # Prepare a database operation and then execute it against all parameter
- # sequences or mappings provided.
- #
- # Stability: Part of the DBAPI 2.0 specification.
- @require_open_cursor
- def executemany(self, operation, parameter_sets):
- if not self._connection.in_transaction:
- self._connection.begin()
- self._override_rowcount = 0
- for parameters in parameter_sets:
- self._execute(operation, parameters)
- if self.cursor.row_count == -1 or self._override_rowcount == -1:
- self._override_rowcount = -1
- else:
- self._override_rowcount += self.cursor.row_count
-
- ##
- # Fetch the next row of a query result set, returning a single sequence, or
- # None when no more data is available.
- #
- # Stability: Part of the DBAPI 2.0 specification.
- @require_open_cursor
- def fetchone(self):
- return self.cursor.read_tuple()
-
- ##
- # Fetch the next set of rows of a query result, returning a sequence of
- # sequences. An empty sequence is returned when no more rows are
- # available.
- #
- # Stability: Part of the DBAPI 2.0 specification.
- # @param size The number of rows to fetch when called. If not provided,
- # the arraysize property value is used instead.
- def fetchmany(self, size=None):
- if size == None:
- size = self.arraysize
- rows = []
- for i in range(size):
- value = self.fetchone()
- if value == None:
- break
- rows.append(value)
- return rows
-
- ##
- # Fetch all remaining rows of a query result, returning them as a sequence
- # of sequences.
- #
- # Stability: Part of the DBAPI 2.0 specification.
- @require_open_cursor
- def fetchall(self):
- return tuple(self.cursor.iterate_tuple())
-
- ##
- # Close the cursor.
- #
- # Stability: Part of the DBAPI 2.0 specification.
- @require_open_cursor
- def close(self):
- self.cursor.close()
- self.cursor = None
- self._override_rowcount = None
-
- def next(self):
- warn("DB-API extension cursor.next() used", stacklevel=2)
- retval = self.fetchone()
- if retval == None:
- raise StopIteration()
- return retval
-
- def __iter__(self):
- warn("DB-API extension cursor.__iter__() used", stacklevel=2)
- return self
-
- def setinputsizes(self, sizes):
- pass
-
- def setoutputsize(self, size, column=None):
- pass
-
- @require_open_cursor
- def fileno(self):
- return self.cursor.fileno()
-
- @require_open_cursor
- def isready(self):
- return self.cursor.isready()
-
-def require_open_connection(fn):
- def _fn(self, *args, **kwargs):
- if self.conn == None:
- raise ConnectionClosedError()
- return fn(self, *args, **kwargs)
- return _fn
-
-##
-# The class of object returned by the {@link #connect connect method}.
-class ConnectionWrapper(object):
- # DBAPI Extension: supply exceptions as attributes on the connection
- Warning = property(lambda self: self._getError(Warning))
- Error = property(lambda self: self._getError(Error))
- InterfaceError = property(lambda self: self._getError(InterfaceError))
- DatabaseError = property(lambda self: self._getError(DatabaseError))
- OperationalError = property(lambda self: self._getError(OperationalError))
- IntegrityError = property(lambda self: self._getError(IntegrityError))
- InternalError = property(lambda self: self._getError(InternalError))
- ProgrammingError = property(lambda self: self._getError(ProgrammingError))
- NotSupportedError = property(lambda self: self._getError(NotSupportedError))
-
- def _getError(self, error):
- warn("DB-API extension connection.%s used" % error.__name__, stacklevel=3)
- return error
-
- @property
- def in_transaction(self):
- if self.conn:
- return self.conn.in_transaction
- return False
-
- def __init__(self, **kwargs):
- self.conn = interface.Connection(**kwargs)
- self.notifies = []
- self.notifies_lock = threading.Lock()
- self.conn.NotificationReceived += self._notificationReceived
- # Two Phase Commit internal attributes:
- self.__tpc_xid = None
- self.__tpc_prepared = None
-
- def set_autocommit(self, state):
- if self.conn.in_transaction and state and not self.conn.autocommit:
- warn("enabling autocommit in an open transaction!")
- self.conn.autocommit = state
-
- def get_autocommit(self):
- return self.conn.autocommit
-
- autocommit = property(get_autocommit, set_autocommit)
-
- @require_open_connection
- def begin(self):
- self.conn.begin()
-
- def _notificationReceived(self, notice):
- try:
- # psycopg2 compatible notification interface
- self.notifies_lock.acquire()
- self.notifies.append((notice.backend_pid, notice.condition))
- finally:
- self.notifies_lock.release()
-
- ##
- # Creates a {@link #CursorWrapper CursorWrapper} object bound to this
- # connection.
- #
- # Stability: Part of the DBAPI 2.0 specification.
- @require_open_connection
- def cursor(self):
- return CursorWrapper(self.conn, self)
-
- ##
- # Commits the current database transaction.
- #
- # Stability: Part of the DBAPI 2.0 specification.
- @require_open_connection
- def commit(self):
- # There's a threading bug here. If a query is sent after the
- # commit, but before the begin, it will be executed immediately
- # without a surrounding transaction. Like all threading bugs -- it
- # sounds unlikely, until it happens every time in one
- # application... however, to fix this, we need to lock the
- # database connection entirely, so that no cursors can execute
- # statements on other threads. Support for that type of lock will
- # be done later.
- if self.__tpc_xid:
- raise ProgrammingError("Cannot do a normal commit() inside a "
- "TPC transaction!")
- self.conn.commit()
-
- ##
- # Rolls back the current database transaction.
- #
- # Stability: Part of the DBAPI 2.0 specification.
- @require_open_connection
- def rollback(self):
- # see bug description in commit.
- if self.__tpc_xid:
- raise ProgrammingError("Cannot do a normal rollback() inside a "
- "TPC transaction!")
- self.conn.rollback()
-
- ##
- # Closes the database connection.
- #
- # Stability: Part of the DBAPI 2.0 specification.
- @require_open_connection
- def close(self):
- self.conn.close()
- self.conn = None
-
- ##
- # Returns the "server_version" string provided by the connected server.
- #
- # Stability: Extension of the DBAPI 2.0 specification.
- @property
- @require_open_connection
- def server_version(self):
- return self.conn.server_version()
-
- # Stability: psycopg2 compatibility
- @require_open_connection
- def set_client_encoding(self, encoding=None):
- "Set the client encoding for the current session"
- if encoding:
- self.conn.execute("SET client_encoding TO '%s';" % (encoding, ), simple_query=True)
- return self.conn.encoding()
-
-
- def xid(self,format_id, global_transaction_id, branch_qualifier):
- """Create a Transaction IDs (only global_transaction_id is used in pg)
- format_id and branch_qualifier are not used in postgres
- global_transaction_id may be any string identifier supported by postgres
- returns a tuple (format_id, global_transaction_id, branch_qualifier)"""
- return (format_id, global_transaction_id, branch_qualifier)
-
- @require_open_connection
- def tpc_begin(self,xid):
- "Begin a two-phase transaction"
- # set auto-commit mode to begin a TPC transaction
- self.autocommit = False
- # (actually in postgres at this point it is a normal one)
- if self.conn.in_transaction:
- warn("tpc_begin() should be called outside a transaction block",
- stacklevel=3)
- self.conn.begin()
- # store actual TPC transaction id
- self.__tpc_xid = xid
- self.__tpc_prepared = False
-
- @require_open_connection
- def tpc_prepare(self):
- "Prepare a two-phase transaction"
- if not self.__tpc_xid:
- raise ProgrammingError("tpc_prepare() outside a TPC transaction "
- "is not allowed!")
- # Prepare the TPC
- self.conn.execute("PREPARE TRANSACTION '%s';" % (self.__tpc_xid[1],),
- simple_query=True)
- self.conn.in_transaction = False
- self.__tpc_prepared = True
-
- @require_open_connection
- def tpc_commit(self, xid=None):
- "Commit a prepared two-phase transaction"
- try:
- # save current autocommit status (to be recovered later)
- previous_autocommit_mode = self.autocommit
- if not xid:
- # use current tpc transaction
- tpc_xid = self.__tpc_xid
- else:
- # use a recovered tpc transaction
- tpc_xid = xid
- if not xid in self.tpc_recover():
- raise ProgrammingError("Requested TPC transaction is not "
- "prepared!")
- if not tpc_xid:
- raise ProgrammingError("Cannot tpc_commit() without a TPC "
- "transaction!")
- if self.__tpc_prepared or (xid != self.__tpc_xid and xid):
- # a two-phase commit:
- # set the auto-commit mode for TPC commit
- self.autocommit = True
- try:
- self.conn.execute("COMMIT PREPARED '%s';" % (tpc_xid[1], ),
- simple_query=True)
- finally:
- # return to previous auto-commit mode
- self.autocommit = previous_autocommit_mode
- else:
- try:
- # a single-phase commit
- self.conn.commit()
- finally:
- # return to previous auto-commit mode
- self.autocommit = previous_autocommit_mode
- finally:
- # transaction is done, clear xid
- self.__tpc_xid = None
-
- @require_open_connection
- def tpc_rollback(self, xid=None):
- "Commit a prepared two-phase transaction"
- try:
- # save current autocommit status (to be recovered later)
- previous_autocommit_mode = self.autocommit
- if not xid:
- # use current tpc transaction
- tpc_xid = self.__tpc_xid
- else:
- # use a recovered tpc transaction
- tpc_xid = xid
- if not xid in self.tpc_recover():
- raise ProgrammingError("Requested TPC transaction is not prepared!")
- if not tpc_xid:
- raise ProgrammingError("Cannot tpc_rollback() without a TPC prepared transaction!")
- if self.__tpc_prepared or (xid != self.__tpc_xid and xid):
- # a two-phase rollback
- # set auto-commit for the TPC rollback
- self.autocommit = True
- try:
- self.conn.execute("ROLLBACK PREPARED '%s';" % (tpc_xid[1],),
- simple_query=True)
- finally:
- # return to previous auto-commit mode
- self.autocommit = previous_autocommit_mode
- else:
- # a single-phase rollback
- try:
- self.conn.rollback()
- finally:
- # return to previous auto-commit mode
- self.autocommit = previous_autocommit_mode
- finally:
- # transaction is done, clear xid
- self.__tpc_xid = None
-
- @require_open_connection
- def tpc_recover(self):
- "Returns a list of pending transaction IDs"
- previous_autocommit_mode = self.autocommit
- if not self.conn.in_transaction and not self.autocommit:
- self.autocommit = True
- elif not self.autocommit:
- warn("tpc_recover() will open a transaction block", stacklevel=3)
-
- curs = self.cursor()
- xids = []
- try:
- # query system view that stores open (prepared) TPC transactions
- curs.execute("SELECT gid FROM pg_prepared_xacts;");
- xids.extend([self.xid(0,row[0],'') for row in curs])
- finally:
- curs.close()
- # return to previous auto-commit mode
- self.autocommit = previous_autocommit_mode
- # return a list of TPC transaction ids (xid)
- return xids
-
-
-##
-# Creates a DBAPI 2.0 compatible interface to a PostgreSQL database.
-#
-# Stability: Part of the DBAPI 2.0 specification.
-#
-# @param user The username to connect to the PostgreSQL server with. This
-# parameter is required.
-#
-# @keyparam host The hostname of the PostgreSQL server to connect with.
-# Providing this parameter is necessary for TCP/IP connections. One of either
-# host, or unix_sock, must be provided.
-#
-# @keyparam unix_sock The path to the UNIX socket to access the database
-# through, for example, '/tmp/.s.PGSQL.5432'. One of either unix_sock or host
-# must be provided. The port parameter will have no affect if unix_sock is
-# provided.
-#
-# @keyparam port The TCP/IP port of the PostgreSQL server instance. This
-# parameter defaults to 5432, the registered and common port of PostgreSQL
-# TCP/IP servers.
-#
-# @keyparam database The name of the database instance to connect with. This
-# parameter is optional, if omitted the PostgreSQL server will assume the
-# database name is the same as the username.
-#
-# @keyparam password The user password to connect to the server with. This
-# parameter is optional. If omitted, and the database server requests password
-# based authentication, the connection will fail. On the other hand, if this
-# parameter is provided and the database does not request password
-# authentication, then the password will not be used.
-#
-# @keyparam socket_timeout Socket connect timeout measured in seconds.
-# Defaults to 60 seconds.
-#
-# @keyparam ssl Use SSL encryption for TCP/IP socket. Defaults to False.
-#
-# @return An instance of {@link #ConnectionWrapper ConnectionWrapper}.
-def connect(dsn="", user=None, host=None, unix_sock=None, port=5432, database=None, password=None, socket_timeout=60, ssl=False):
- return ConnectionWrapper(dsn=dsn, user=user, host=host,
- unix_sock=unix_sock, port=port, database=database,
- password=password, socket_timeout=socket_timeout, ssl=ssl)
-
-def Date(year, month, day):
- return datetime.date(year, month, day)
-
-def Time(hour, minute, second):
- return datetime.time(hour, minute, second)
-
-def Timestamp(year, month, day, hour, minute, second):
- return datetime.datetime(year, month, day, hour, minute, second)
-
-def DateFromTicks(ticks):
- return Date(*time.localtime(ticks)[:3])
-
-def TimeFromTicks(ticks):
- return Time(*time.localtime(ticks)[3:6])
-
-def TimestampFromTicks(ticks):
- return Timestamp(*time.localtime(ticks)[:6])
-
-##
-# Construct an object holding binary data.
-def Binary(value):
- return types.Bytea(value)
-
-# I have no idea what this would be used for by a client app. Should it be
-# TEXT, VARCHAR, CHAR? It will only compare against row_description's
-# type_code if it is this one type. It is the varchar type oid for now, this
-# appears to match expectations in the DB API 2.0 compliance test suite.
-STRING = 1043
-
-# bytea type_oid
-BINARY = 17
-
-# numeric type_oid
-NUMBER = 1700
-
-# timestamp type_oid
-DATETIME = 1114
-
-# oid type_oid
-ROWID = 26
-
-
diff --git a/gluon/contrib/pg8000/errors.py b/gluon/contrib/pg8000/errors.py
deleted file mode 100644
index b8b5acfb..00000000
--- a/gluon/contrib/pg8000/errors.py
+++ /dev/null
@@ -1,115 +0,0 @@
-# vim: sw=4:expandtab:foldmethod=marker
-#
-# Copyright (c) 2007-2009, Mathieu Fenniak
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright notice,
-# this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
-# and/or other materials provided with the distribution.
-# * The name of the author may not be used to endorse or promote products
-# derived from this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-
-__author__ = "Mathieu Fenniak"
-
-class Warning(StandardError):
- pass
-
-class Error(StandardError):
- pass
-
-class InterfaceError(Error):
- pass
-
-class ConnectionClosedError(InterfaceError):
- def __init__(self):
- InterfaceError.__init__(self, "connection is closed")
-
-class CursorClosedError(InterfaceError):
- def __init__(self):
- InterfaceError.__init__(self, "cursor is closed")
-
-class DatabaseError(Error):
- pass
-
-class DataError(DatabaseError):
- pass
-
-class OperationalError(DatabaseError):
- pass
-
-class IntegrityError(DatabaseError):
- pass
-
-class InternalError(DatabaseError):
- pass
-
-class ProgrammingError(DatabaseError):
- pass
-
-class NotSupportedError(DatabaseError):
- pass
-
-##
-# An exception that is thrown when an internal error occurs trying to
-# decode binary array data from the server.
-class ArrayDataParseError(InternalError):
- pass
-
-##
-# Thrown when attempting to transmit an array of unsupported data types.
-class ArrayContentNotSupportedError(NotSupportedError):
- pass
-
-##
-# Thrown when attempting to send an array that doesn't contain all the same
-# type of objects (eg. some floats, some ints).
-class ArrayContentNotHomogenousError(ProgrammingError):
- pass
-
-##
-# Attempted to pass an empty array in, but it's not possible to determine the
-# data type for an empty array.
-class ArrayContentEmptyError(ProgrammingError):
- pass
-
-##
-# Attempted to use a multidimensional array with inconsistent array sizes.
-class ArrayDimensionsNotConsistentError(ProgrammingError):
- pass
-
-# A cursor's copy_to or copy_from argument was not provided a table or query
-# to operate on.
-class CopyQueryOrTableRequiredError(ProgrammingError):
- pass
-
-# Raised if a COPY query is executed without using copy_to or copy_from
-# functions to provide a data stream.
-class CopyQueryWithoutStreamError(ProgrammingError):
- pass
-
-# When query parameters don't match up with query args.
-class QueryParameterIndexError(ProgrammingError):
- pass
-
-# Some sort of parse error occured during query parameterization.
-class QueryParameterParseError(ProgrammingError):
- pass
-
diff --git a/gluon/contrib/pg8000/interface.py b/gluon/contrib/pg8000/interface.py
deleted file mode 100644
index e3ccda00..00000000
--- a/gluon/contrib/pg8000/interface.py
+++ /dev/null
@@ -1,660 +0,0 @@
-# vim: sw=4:expandtab:foldmethod=marker
-#
-# Copyright (c) 2007-2009, Mathieu Fenniak
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright notice,
-# this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
-# and/or other materials provided with the distribution.
-# * The name of the author may not be used to endorse or promote products
-# derived from this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-
-__author__ = "Mathieu Fenniak"
-
-import socket
-import protocol
-import threading
-from errors import *
-
-def conninfo_parse(conninfo):
- "Conninfo parser routine based on libpq conninfo_parse"
- options = {}
- buf = conninfo + " "
- tmp = pname = ""
- quoted_string = False
- cp = 0
- while cp < len(buf):
- # Skip blanks before the parameter name
- c = buf[cp]
- if c.isspace() and tmp and not quoted_string and pname:
- options[pname] = tmp
- tmp = pname = ""
- elif c == "'":
- quoted_string = not quoted_string
- elif c == '\\':
- cp += 1
- tmp += buf[cp]
- elif c == "=":
- if not tmp:
- raise RuntimeError("missing parameter name (conninfo:%s)" % cp)
- pname = tmp
- tmp = ""
- elif not c.isspace() or quoted_string:
- tmp += c
- cp += 1
- if quoted_string:
- raise RuntimeError("unterminated quoted string (conninfo:%s)" % cp)
- return options
-
-class DataIterator(object):
- def __init__(self, obj, func):
- self.obj = obj
- self.func = func
-
- def __iter__(self):
- return self
-
- def next(self):
- retval = self.func(self.obj)
- if retval == None:
- raise StopIteration()
- return retval
-
-statement_number_lock = threading.Lock()
-statement_number = 0
-
-##
-# This class represents a prepared statement. A prepared statement is
-# pre-parsed on the server, which reduces the need to parse the query every
-# time it is run. The statement can have parameters in the form of $1, $2, $3,
-# etc. When parameters are used, the types of the parameters need to be
-# specified when creating the prepared statement.
-#
-# As of v1.01, instances of this class are thread-safe. This means that a
-# single PreparedStatement can be accessed by multiple threads without the
-# internal consistency of the statement being altered. However, the
-# responsibility is on the client application to ensure that one thread reading
-# from a statement isn't affected by another thread starting a new query with
-# the same statement.
-#
-# Stability: Added in v1.00, stability guaranteed for v1.xx.
-#
-# @param connection An instance of {@link Connection Connection}.
-#
-# @param statement The SQL statement to be represented, often containing
-# parameters in the form of $1, $2, $3, etc.
-#
-# @param types Python type objects for each parameter in the SQL
-# statement. For example, int, float, str.
-class PreparedStatement(object):
-
- ##
- # Determines the number of rows to read from the database server at once.
- # Reading more rows increases performance at the cost of memory. The
- # default value is 100 rows. The affect of this parameter is transparent.
- # That is, the library reads more rows when the cache is empty
- # automatically.
- #
- # Stability: Added in v1.00, stability guaranteed for v1.xx. It is
- # possible that implementation changes in the future could cause this
- # parameter to be ignored.
- row_cache_size = 100
-
- def __init__(self, connection, statement, *types, **kwargs):
- global statement_number
- if connection == None or connection.c == None:
- raise InterfaceError("connection not provided")
- try:
- statement_number_lock.acquire()
- self._statement_number = statement_number
- statement_number += 1
- finally:
- statement_number_lock.release()
- self.c = connection.c
- self._portal_name = None
- self._statement_name = kwargs.get("statement_name", "pg8000_statement_%s" % self._statement_number)
- self._row_desc = None
- self._cached_rows = []
- self._ongoing_row_count = 0
- self._command_complete = True
- self._parse_row_desc = self.c.parse(self._statement_name, statement, types)
- self._lock = threading.RLock()
-
- def close(self):
- if self._statement_name != "": # don't close unnamed statement
- self.c.close_statement(self._statement_name)
- if self._portal_name != None:
- self.c.close_portal(self._portal_name)
- self._portal_name = None
-
- row_description = property(lambda self: self._getRowDescription())
- def _getRowDescription(self):
- if self._row_desc == None:
- return None
- return self._row_desc.fields
-
- ##
- # Run the SQL prepared statement with the given parameters.
- #
- # Stability: Added in v1.00, stability guaranteed for v1.xx.
- def execute(self, *args, **kwargs):
- self._lock.acquire()
- try:
- if not self._command_complete:
- # cleanup last execute
- self._cached_rows = []
- self._ongoing_row_count = 0
- if self._portal_name != None:
- self.c.close_portal(self._portal_name)
- self._command_complete = False
- self._portal_name = "pg8000_portal_%s" % self._statement_number
- self._row_desc, cmd = self.c.bind(self._portal_name, self._statement_name, args, self._parse_row_desc, kwargs.get("stream"))
- if self._row_desc:
- # We execute our cursor right away to fill up our cache. This
- # prevents the cursor from being destroyed, apparently, by a rogue
- # Sync between Bind and Execute. Since it is quite likely that
- # data will be read from us right away anyways, this seems a safe
- # move for now.
- self._fill_cache()
- else:
- self._command_complete = True
- self._ongoing_row_count = -1
- if cmd != None and cmd.rows != None:
- self._ongoing_row_count = cmd.rows
- finally:
- self._lock.release()
-
- def _fill_cache(self):
- self._lock.acquire()
- try:
- if self._cached_rows:
- raise InternalError("attempt to fill cache that isn't empty")
- end_of_data, rows = self.c.fetch_rows(self._portal_name, self.row_cache_size, self._row_desc)
- self._cached_rows = rows
- if end_of_data:
- self._command_complete = True
- finally:
- self._lock.release()
-
- def _fetch(self):
- if not self._row_desc:
- raise ProgrammingError("no result set")
- self._lock.acquire()
- try:
- if not self._cached_rows:
- if self._command_complete:
- return None
- self._fill_cache()
- if self._command_complete and not self._cached_rows:
- # fill cache tells us the command is complete, but yet we have
- # no rows after filling our cache. This is a special case when
- # a query returns no rows.
- return None
- row = self._cached_rows.pop(0)
- self._ongoing_row_count += 1
- return tuple(row)
- finally:
- self._lock.release()
-
- ##
- # Return a count of the number of rows relevant to the executed statement.
- # For a SELECT, this is the number of rows returned. For UPDATE or DELETE,
- # this the number of rows affected. For INSERT, the number of rows
- # inserted. This property may have a value of -1 to indicate that there
- # was no row count.
- #
- # During a result-set query (eg. SELECT, or INSERT ... RETURNING ...),
- # accessing this property requires reading the entire result-set into
- # memory, as reading the data to completion is the only way to determine
- # the total number of rows. Avoid using this property in with
- # result-set queries, as it may cause unexpected memory usage.
- #
- # Stability: Added in v1.03, stability guaranteed for v1.xx.
- row_count = property(lambda self: self._get_row_count())
- def _get_row_count(self):
- self._lock.acquire()
- try:
- if not self._command_complete:
- end_of_data, rows = self.c.fetch_rows(self._portal_name, 0, self._row_desc)
- self._cached_rows += rows
- if end_of_data:
- self._command_complete = True
- else:
- raise InternalError("fetch_rows(0) did not hit end of data")
- return self._ongoing_row_count + len(self._cached_rows)
- finally:
- self._lock.release()
-
- ##
- # Read a row from the database server, and return it in a dictionary
- # indexed by column name/alias. This method will raise an error if two
- # columns have the same name. Returns None after the last row.
- #
- # Stability: Added in v1.00, stability guaranteed for v1.xx.
- def read_dict(self):
- row = self._fetch()
- if row == None:
- return row
- retval = {}
- for i in range(len(self._row_desc.fields)):
- col_name = self._row_desc.fields[i]['name']
- if retval.has_key(col_name):
- raise InterfaceError("cannot return dict of row when two columns have the same name (%r)" % (col_name,))
- retval[col_name] = row[i]
- return retval
-
- ##
- # Read a row from the database server, and return it as a tuple of values.
- # Returns None after the last row.
- #
- # Stability: Added in v1.00, stability guaranteed for v1.xx.
- def read_tuple(self):
- return self._fetch()
-
- ##
- # Return an iterator for the output of this statement. The iterator will
- # return a tuple for each row, in the same manner as {@link
- # #PreparedStatement.read_tuple read_tuple}.
- #
- # Stability: Added in v1.00, stability guaranteed for v1.xx.
- def iterate_tuple(self):
- return DataIterator(self, PreparedStatement.read_tuple)
-
- ##
- # Return an iterator for the output of this statement. The iterator will
- # return a dict for each row, in the same manner as {@link
- # #PreparedStatement.read_dict read_dict}.
- #
- # Stability: Added in v1.00, stability guaranteed for v1.xx.
- def iterate_dict(self):
- return DataIterator(self, PreparedStatement.read_dict)
-
-
-class SimpleStatement(PreparedStatement):
- "Internal wrapper to Simple Query protocol emulating a PreparedStatement"
-
- # This should be used internally only for trivial queries
- # (not a true Prepared Statement, in fact it can have multiple statements)
- # See Simple Query Protocol limitations and trade-offs (send_simple_query)
-
- row_cache_size = None
-
- def __init__(self, connection, statement):
- if connection == None or connection.c == None:
- raise InterfaceError("connection not provided")
- self.c = connection.c
- self._row_desc = None
- self._cached_rows = []
- self._ongoing_row_count = -1
- self._command_complete = True
- self.statement = statement
- self._lock = threading.RLock()
-
- def close(self):
- # simple query doesn't have portals
- pass
-
- def execute(self, *args, **kwargs):
- "Run the SQL simple query stataments"
- self._lock.acquire()
- try:
- self._row_desc, cmd_complete, self._cached_rows = \
- self.c.send_simple_query(self.statement, kwargs.get("stream"))
- self._command_complete = True
- self._ongoing_row_count = -1
- if cmd_complete is not None and cmd_complete.rows is not None:
- self._ongoing_row_count = cmd_complete.rows
- finally:
- self._lock.release()
-
- def _fill_cache(self):
- # data rows are already fetched in _cached_rows
- pass
-
- def _fetch(self):
- if not self._row_desc:
- raise ProgrammingError("no result set")
- self._lock.acquire()
- try:
- if not self._cached_rows:
- return None
- row = self._cached_rows.pop(0)
- return tuple(row)
- finally:
- self._lock.release()
-
- def _get_row_count(self):
- return self._ongoing_row_count
-
-
-##
-# The Cursor class allows multiple queries to be performed concurrently with a
-# single PostgreSQL connection. The Cursor object is implemented internally by
-# using a {@link PreparedStatement PreparedStatement} object, so if you plan to
-# use a statement multiple times, you might as well create a PreparedStatement
-# and save a small amount of reparsing time.
-#
-# As of v1.01, instances of this class are thread-safe. See {@link
-# PreparedStatement PreparedStatement} for more information.
-#
-# Stability: Added in v1.00, stability guaranteed for v1.xx.
-#
-# @param connection An instance of {@link Connection Connection}.
-class Cursor(object):
- def __init__(self, connection):
- self.connection = connection
- self._stmt = None
-
- def require_stmt(func):
- def retval(self, *args, **kwargs):
- if self._stmt == None:
- raise ProgrammingError("attempting to use unexecuted cursor")
- return func(self, *args, **kwargs)
- return retval
-
- row_description = property(lambda self: self._getRowDescription())
- def _getRowDescription(self):
- if self._stmt == None:
- return None
- return self._stmt.row_description
-
- ##
- # Run an SQL statement using this cursor. The SQL statement can have
- # parameters in the form of $1, $2, $3, etc., which will be filled in by
- # the additional arguments passed to this function.
- #
- # Stability: Added in v1.00, stability guaranteed for v1.xx.
- # @param query The SQL statement to execute.
- def execute(self, query, *args, **kwargs):
- if self.connection.is_closed:
- raise ConnectionClosedError()
- self.connection._unnamed_prepared_statement_lock.acquire()
- try:
- if kwargs.get("simple_query"):
- # no arguments and no statement name,
- # use PostgreSQL Simple Query Protocol
- ## print "SimpleQuery:", query
- self._stmt = SimpleStatement(self.connection, query)
- else:
- # use PostgreSQL Extended Query Protocol
- self._stmt = PreparedStatement(self.connection, query, statement_name="", *[{"type": type(x), "value": x} for x in args])
- self._stmt.execute(*args, **kwargs)
- finally:
- self.connection._unnamed_prepared_statement_lock.release()
-
- ##
- # Return a count of the number of rows currently being read. If possible,
- # please avoid using this function. It requires reading the entire result
- # set from the database to determine the number of rows being returned.
- #
- # Stability: Added in v1.03, stability guaranteed for v1.xx.
- # Implementation currently requires caching entire result set into memory,
- # avoid using this property.
- row_count = property(lambda self: self._get_row_count())
-
- @require_stmt
- def _get_row_count(self):
- return self._stmt.row_count
-
- ##
- # Read a row from the database server, and return it in a dictionary
- # indexed by column name/alias. This method will raise an error if two
- # columns have the same name. Returns None after the last row.
- #
- # Stability: Added in v1.00, stability guaranteed for v1.xx.
- @require_stmt
- def read_dict(self):
- return self._stmt.read_dict()
-
- ##
- # Read a row from the database server, and return it as a tuple of values.
- # Returns None after the last row.
- #
- # Stability: Added in v1.00, stability guaranteed for v1.xx.
- @require_stmt
- def read_tuple(self):
- return self._stmt.read_tuple()
-
- ##
- # Return an iterator for the output of this statement. The iterator will
- # return a tuple for each row, in the same manner as {@link
- # #PreparedStatement.read_tuple read_tuple}.
- #
- # Stability: Added in v1.00, stability guaranteed for v1.xx.
- @require_stmt
- def iterate_tuple(self):
- return self._stmt.iterate_tuple()
-
- ##
- # Return an iterator for the output of this statement. The iterator will
- # return a dict for each row, in the same manner as {@link
- # #PreparedStatement.read_dict read_dict}.
- #
- # Stability: Added in v1.00, stability guaranteed for v1.xx.
- @require_stmt
- def iterate_dict(self):
- return self._stmt.iterate_dict()
-
- def close(self):
- if self._stmt != None:
- self._stmt.close()
- self._stmt = None
-
-
- ##
- # Return the fileno of the underlying socket for this cursor's connection.
- #
- # Stability: Added in v1.07, stability guaranteed for v1.xx.
- def fileno(self):
- return self.connection.fileno()
-
- ##
- # Poll the underlying socket for this cursor and sync if there is data waiting
- # to be read. This has the effect of flushing asynchronous messages from the
- # backend. Returns True if messages were read, False otherwise.
- #
- # Stability: Added in v1.07, stability guaranteed for v1.xx.
- def isready(self):
- return self.connection.isready()
-
-
-##
-# This class represents a connection to a PostgreSQL database.
-#
-# The database connection is derived from the {@link #Cursor Cursor} class,
-# which provides a default cursor for running queries. It also provides
-# transaction control via the 'begin', 'commit', and 'rollback' methods.
-# Without beginning a transaction explicitly, all statements will autocommit to
-# the database.
-#
-# As of v1.01, instances of this class are thread-safe. See {@link
-# PreparedStatement PreparedStatement} for more information.
-#
-# Stability: Added in v1.00, stability guaranteed for v1.xx.
-#
-# @param user The username to connect to the PostgreSQL server with. This
-# parameter is required.
-#
-# @keyparam host The hostname of the PostgreSQL server to connect with.
-# Providing this parameter is necessary for TCP/IP connections. One of either
-# host, or unix_sock, must be provided.
-#
-# @keyparam unix_sock The path to the UNIX socket to access the database
-# through, for example, '/tmp/.s.PGSQL.5432'. One of either unix_sock or host
-# must be provided. The port parameter will have no affect if unix_sock is
-# provided.
-#
-# @keyparam port The TCP/IP port of the PostgreSQL server instance. This
-# parameter defaults to 5432, the registered and common port of PostgreSQL
-# TCP/IP servers.
-#
-# @keyparam database The name of the database instance to connect with. This
-# parameter is optional, if omitted the PostgreSQL server will assume the
-# database name is the same as the username.
-#
-# @keyparam password The user password to connect to the server with. This
-# parameter is optional. If omitted, and the database server requests password
-# based authentication, the connection will fail. On the other hand, if this
-# parameter is provided and the database does not request password
-# authentication, then the password will not be used.
-#
-# @keyparam socket_timeout Socket connect timeout measured in seconds.
-# Defaults to 60 seconds.
-#
-# @keyparam ssl Use SSL encryption for TCP/IP socket. Defaults to False.
-class Connection(Cursor):
- def __init__(self, dsn="", user=None, host=None, unix_sock=None, port=5432, database=None, password=None, socket_timeout=60, ssl=False):
- self._row_desc = None
- if dsn:
- # update connection parameters parsed of the conninfo dsn
- opts = conninfo_parse(dsn)
- database = opts.get("dbname", database)
- user = opts.get("user", user)
- password = opts.get("password", user)
- host = opts.get("host", host)
- port = int(opts.get("port", port))
- ssl = opts.get("sslmode", 'disable') != 'disable'
- try:
- self.c = protocol.Connection(unix_sock=unix_sock, host=host, port=port, socket_timeout=socket_timeout, ssl=ssl)
- self.c.authenticate(user, password=password, database=database)
- except socket.error, e:
- raise InterfaceError("communication error", e)
- Cursor.__init__(self, self)
- self._begin = PreparedStatement(self, "BEGIN TRANSACTION")
- self._commit = PreparedStatement(self, "COMMIT TRANSACTION")
- self._rollback = PreparedStatement(self, "ROLLBACK TRANSACTION")
- self._unnamed_prepared_statement_lock = threading.RLock()
- self.in_transaction = False
- self.autocommit = False
-
- ##
- # An event handler that is fired when NOTIFY occurs for a notification that
- # has been LISTEN'd for. The value of this property is a
- # util.MulticastDelegate. A callback can be added by using
- # connection.NotificationReceived += SomeMethod. The method will be called
- # with a single argument, an object that has properties: backend_pid,
- # condition, and additional_info. Callbacks can be removed with the -=
- # operator.
- #
- # Stability: Added in v1.03, stability guaranteed for v1.xx.
- NotificationReceived = property(
- lambda self: getattr(self.c, "NotificationReceived"),
- lambda self, value: setattr(self.c, "NotificationReceived", value)
- )
-
- ##
- # An event handler that is fired when the database server issues a notice.
- # The value of this property is a util.MulticastDelegate. A callback can
- # be added by using connection.NotificationReceived += SomeMethod. The
- # method will be called with a single argument, an object that has
- # properties: severity, code, msg, and possibly others (detail, hint,
- # position, where, file, line, and routine). Callbacks can be removed with
- # the -= operator.
- #
- # Stability: Added in v1.03, stability guaranteed for v1.xx.
- NoticeReceived = property(
- lambda self: getattr(self.c, "NoticeReceived"),
- lambda self, value: setattr(self.c, "NoticeReceived", value)
- )
-
- ##
- # An event handler that is fired when a runtime configuration option is
- # changed on the server. The value of this property is a
- # util.MulticastDelegate. A callback can be added by using
- # connection.NotificationReceived += SomeMethod. Callbacks can be removed
- # with the -= operator. The method will be called with a single argument,
- # an object that has properties "key" and "value".
- #
- # Stability: Added in v1.03, stability guaranteed for v1.xx.
- ParameterStatusReceived = property(
- lambda self: getattr(self.c, "ParameterStatusReceived"),
- lambda self, value: setattr(self.c, "ParameterStatusReceived", value)
- )
-
- ##
- # Begins a new transaction.
- #
- # Stability: Added in v1.00, stability guaranteed for v1.xx.
- def begin(self):
- if self.is_closed:
- raise ConnectionClosedError()
- if self.autocommit:
- return
- self._begin.execute()
- self.in_transaction = True
-
-
- ##
- # Commits the running transaction.
- #
- # Stability: Added in v1.00, stability guaranteed for v1.xx.
- def commit(self):
- if self.is_closed:
- raise ConnectionClosedError()
- self._commit.execute()
- self.in_transaction = False
-
- ##
- # Rolls back the running transaction.
- #
- # Stability: Added in v1.00, stability guaranteed for v1.xx.
- def rollback(self):
- if self.is_closed:
- raise ConnectionClosedError()
- self._rollback.execute()
- self.in_transaction = False
-
- ##
- # Closes an open connection.
- def close(self):
- if self.is_closed:
- raise ConnectionClosedError()
- self.c.close()
- self.c = None
-
- is_closed = property(lambda self: self.c == None)
-
- ##
- # Return the fileno of the underlying socket for this connection.
- #
- # Stability: Added in v1.07, stability guaranteed for v1.xx.
- def fileno(self):
- return self.c.fileno()
-
- ##
- # Poll the underlying socket for this connection and sync if there is data
- # waiting to be read. This has the effect of flushing asynchronous
- # messages from the backend. Returns True if messages were read, False
- # otherwise.
- #
- # Stability: Added in v1.07, stability guaranteed for v1.xx.
- def isready(self):
- return self.c.isready()
-
- ##
- # Return the server_version as reported from the connected server.
- # Raises InterfaceError if no version has been reported from the server.
- def server_version(self):
- return self.c.server_version()
-
- def encoding(self, encoding=None):
- "Returns the client_encoding as reported from the connected server"
- return self.c.encoding()
\ No newline at end of file
diff --git a/gluon/contrib/pg8000/protocol.py b/gluon/contrib/pg8000/protocol.py
deleted file mode 100644
index 16265d28..00000000
--- a/gluon/contrib/pg8000/protocol.py
+++ /dev/null
@@ -1,1411 +0,0 @@
-# vim: sw=4:expandtab:foldmethod=marker
-#
-# Copyright (c) 2007-2009, Mathieu Fenniak
-# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are
-# met:
-#
-# * Redistributions of source code must retain the above copyright notice,
-# this list of conditions and the following disclaimer.
-# * Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
-# and/or other materials provided with the distribution.
-# * The name of the author may not be used to endorse or promote products
-# derived from this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-
-__author__ = "Mathieu Fenniak"
-
-import socket
-try:
- import ssl as sslmodule
-except ImportError:
- sslmodule = None
-import select
-import threading
-import struct
-import hashlib
-from cStringIO import StringIO
-
-from errors import *
-from util import MulticastDelegate
-import types
-
-##
-# An SSLRequest message. To initiate an SSL-encrypted connection, an
-# SSLRequest message is used rather than a {@link StartupMessage
-# StartupMessage}. A StartupMessage is still sent, but only after SSL
-# negotiation (if accepted).
-#
-# Stability: This is an internal class. No stability guarantee is made.
-class SSLRequest(object):
- def __init__(self):
- pass
-
- # Int32(8) - Message length, including self.
-# Stability: This is an internal class. No stability guarantee is made.
-class StartupMessage(object):
- def __init__(self, user, database=None):
- self.user = user
- self.database = database
-
- # Int32 - Message length, including self.
- # Int32(196608) - Protocol version number. Version 3.0.
- # Any number of key/value pairs, terminated by a zero byte:
- # String - A parameter name (user, database, or options)
- # String - Parameter value
- def serialize(self):
- protocol = 196608
- val = struct.pack("!i", protocol)
- val += "user\x00" + self.user + "\x00"
- if self.database:
- val += "database\x00" + self.database + "\x00"
- val += "\x00"
- val = struct.pack("!i", len(val) + 4) + val
- return val
-
-
-##
-# Parse message. Creates a prepared statement in the DB session.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-#
-# @param ps Name of the prepared statement to create.
-# @param qs Query string.
-# @param type_oids An iterable that contains the PostgreSQL type OIDs for
-# parameters in the query string.
-class Parse(object):
- def __init__(self, ps, qs, type_oids):
- if isinstance(qs, unicode):
- raise TypeError("qs must be encoded byte data")
- self.ps = ps
- self.qs = qs
- self.type_oids = type_oids
-
- def __repr__(self):
- return "
-# Stability: This is an internal class. No stability guarantee is made.
-#
-# @param portal Name of the destination portal.
-# @param ps Name of the source prepared statement.
-# @param in_fc An iterable containing the format codes for input
-# parameters. 0 = Text, 1 = Binary.
-# @param params The parameters.
-# @param out_fc An iterable containing the format codes for output
-# parameters. 0 = Text, 1 = Binary.
-# @param kwargs Additional arguments to pass to the type conversion
-# methods.
-class Bind(object):
- def __init__(self, portal, ps, in_fc, params, out_fc, **kwargs):
- self.portal = portal
- self.ps = ps
- self.in_fc = in_fc
- self.params = []
- for i in range(len(params)):
- if len(self.in_fc) == 0:
- fc = 0
- elif len(self.in_fc) == 1:
- fc = self.in_fc[0]
- else:
- fc = self.in_fc[i]
- self.params.append(types.pg_value(params[i], fc, **kwargs))
- self.out_fc = out_fc
-
- def __repr__(self):
- return "
-# Stability: This is an internal class. No stability guarantee is made.
-#
-# @param typ 'S' for prepared statement, 'P' for portal.
-# @param name The name of the item to close.
-class Close(object):
- def __init__(self, typ, name):
- if len(typ) != 1:
- raise InternalError("Close typ must be 1 char")
- self.typ = typ
- self.name = name
-
- # Byte1('C') - Identifies the message as a close command.
- # Int32 - Message length, including self.
- # Byte1 - 'S' for prepared statement, 'P' for portal.
- # String - The name of the item to close.
- def serialize(self):
- val = self.typ + self.name + "\x00"
- val = struct.pack("!i", len(val) + 4) + val
- val = "C" + val
- return val
-
-
-##
-# A specialized Close message for a portal.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-class ClosePortal(Close):
- def __init__(self, name):
- Close.__init__(self, "P", name)
-
-
-##
-# A specialized Close message for a prepared statement.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-class ClosePreparedStatement(Close):
- def __init__(self, name):
- Close.__init__(self, "S", name)
-
-
-##
-# A Describe message, used for obtaining information on prepared statements
-# and portals.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-#
-# @param typ 'S' for prepared statement, 'P' for portal.
-# @param name The name of the item to close.
-class Describe(object):
- def __init__(self, typ, name):
- if len(typ) != 1:
- raise InternalError("Describe typ must be 1 char")
- self.typ = typ
- self.name = name
-
- # Byte1('D') - Identifies the message as a describe command.
- # Int32 - Message length, including self.
- # Byte1 - 'S' for prepared statement, 'P' for portal.
- # String - The name of the item to close.
- def serialize(self):
- val = self.typ + self.name + "\x00"
- val = struct.pack("!i", len(val) + 4) + val
- val = "D" + val
- return val
-
-
-##
-# A specialized Describe message for a portal.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-class DescribePortal(Describe):
- def __init__(self, name):
- Describe.__init__(self, "P", name)
-
- def __repr__(self):
- return "
-# Stability: This is an internal class. No stability guarantee is made.
-class DescribePreparedStatement(Describe):
- def __init__(self, name):
- Describe.__init__(self, "S", name)
-
- def __repr__(self):
- return "
-# Stability: This is an internal class. No stability guarantee is made.
-class Flush(object):
- # Byte1('H') - Identifies the message as a flush command.
- # Int32(4) - Length of message, including self.
- def serialize(self):
- return 'H\x00\x00\x00\x04'
-
- def __repr__(self):
- return "
-# Stability: This is an internal class. No stability guarantee is made.
-class Sync(object):
- # Byte1('S') - Identifies the message as a sync command.
- # Int32(4) - Length of message, including self.
- def serialize(self):
- return 'S\x00\x00\x00\x04'
-
- def __repr__(self):
- return "
-# Stability: This is an internal class. No stability guarantee is made.
-class PasswordMessage(object):
- def __init__(self, pwd):
- self.pwd = pwd
-
- # Byte1('p') - Identifies the message as a password message.
- # Int32 - Message length including self.
- # String - The password. Password may be encrypted.
- def serialize(self):
- val = self.pwd + "\x00"
- val = struct.pack("!i", len(val) + 4) + val
- val = "p" + val
- return val
-
-
-##
-# Requests that the backend execute a portal and retrieve any number of rows.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-# @param row_count The number of rows to return. Can be zero to indicate the
-# backend should return all rows. If the portal represents a
-# query that does not return rows, no rows will be returned
-# no matter what the row_count.
-class Execute(object):
- def __init__(self, portal, row_count):
- self.portal = portal
- self.row_count = row_count
-
- # Byte1('E') - Identifies the message as an execute message.
- # Int32 - Message length, including self.
- # String - The name of the portal to execute.
- # Int32 - Maximum number of rows to return, if portal contains a query that
- # returns rows. 0 = no limit.
- def serialize(self):
- val = self.portal + "\x00" + struct.pack("!i", self.row_count)
- val = struct.pack("!i", len(val) + 4) + val
- val = "E" + val
- return val
-
-
-class SimpleQuery(object):
- "Requests that the backend execute a Simple Query (SQL string)"
-
- def __init__(self, query_string):
- self.query_string = query_string
-
- # Byte1('Q') - Identifies the message as an query message.
- # Int32 - Message length, including self.
- # String - The query string itself.
- def serialize(self):
- val = self.query_string + "\x00"
- val = struct.pack("!i", len(val) + 4) + val
- val = "Q" + val
- return val
-
- def __repr__(self):
- return "
-# Stability: This is an internal class. No stability guarantee is made.
-class Terminate(object):
- def __init__(self):
- pass
-
- # Byte1('X') - Identifies the message as a terminate message.
- # Int32(4) - Message length, including self.
- def serialize(self):
- return 'X\x00\x00\x00\x04'
-
-##
-# Base class of all Authentication[*] messages.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-class AuthenticationRequest(object):
- def __init__(self, data):
- pass
-
- # Byte1('R') - Identifies the message as an authentication request.
- # Int32(8) - Message length, including self.
- # Int32 - An authentication code that represents different
- # authentication messages:
- # 0 = AuthenticationOk
- # 5 = MD5 pwd
- # 2 = Kerberos v5 (not supported by pg8000)
- # 3 = Cleartext pwd (not supported by pg8000)
- # 4 = crypt() pwd (not supported by pg8000)
- # 6 = SCM credential (not supported by pg8000)
- # 7 = GSSAPI (not supported by pg8000)
- # 8 = GSSAPI data (not supported by pg8000)
- # 9 = SSPI (not supported by pg8000)
- # Some authentication messages have additional data following the
- # authentication code. That data is documented in the appropriate class.
- def createFromData(data):
- ident = struct.unpack("!i", data[:4])[0]
- klass = authentication_codes.get(ident, None)
- if klass != None:
- return klass(data[4:])
- else:
- raise NotSupportedError("authentication method %r not supported" % (ident,))
- createFromData = staticmethod(createFromData)
-
- def ok(self, conn, user, **kwargs):
- raise InternalError("ok method should be overridden on AuthenticationRequest instance")
-
-##
-# A message representing that the backend accepting the provided username
-# without any challenge.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-class AuthenticationOk(AuthenticationRequest):
- def ok(self, conn, user, **kwargs):
- return True
-
-
-##
-# A message representing the backend requesting an MD5 hashed password
-# response. The response will be sent as md5(md5(pwd + login) + salt).
-#
-# Stability: This is an internal class. No stability guarantee is made.
-class AuthenticationMD5Password(AuthenticationRequest):
- # Additional message data:
- # Byte4 - Hash salt.
- def __init__(self, data):
- self.salt = "".join(struct.unpack("4c", data))
-
- def ok(self, conn, user, password=None, **kwargs):
- if password == None:
- raise InterfaceError("server requesting MD5 password authentication, but no password was provided")
- pwd = "md5" + hashlib.md5(hashlib.md5(password + user).hexdigest() + self.salt).hexdigest()
- conn._send(PasswordMessage(pwd))
- conn._flush()
-
- reader = MessageReader(conn)
- reader.add_message(AuthenticationRequest, lambda msg, reader: reader.return_value(msg.ok(conn, user)), reader)
- reader.add_message(ErrorResponse, self._ok_error)
- return reader.handle_messages()
-
- def _ok_error(self, msg):
- if msg.code == "28000":
- raise InterfaceError("md5 password authentication failed")
- else:
- raise msg.createException()
-
-authentication_codes = {
- 0: AuthenticationOk,
- 5: AuthenticationMD5Password,
-}
-
-
-##
-# ParameterStatus message sent from backend, used to inform the frotnend of
-# runtime configuration parameter changes.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-class ParameterStatus(object):
- def __init__(self, key, value):
- self.key = key
- self.value = value
-
- # Byte1('S') - Identifies ParameterStatus
- # Int32 - Message length, including self.
- # String - Runtime parameter name.
- # String - Runtime parameter value.
- def createFromData(data):
- key = data[:data.find("\x00")]
- value = data[data.find("\x00")+1:-1]
- return ParameterStatus(key, value)
- createFromData = staticmethod(createFromData)
-
-
-##
-# BackendKeyData message sent from backend. Contains a connection's process
-# ID and a secret key. Can be used to terminate the connection's current
-# actions, such as a long running query. Not supported by pg8000 yet.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-class BackendKeyData(object):
- def __init__(self, process_id, secret_key):
- self.process_id = process_id
- self.secret_key = secret_key
-
- # Byte1('K') - Identifier.
- # Int32(12) - Message length, including self.
- # Int32 - Process ID.
- # Int32 - Secret key.
- def createFromData(data):
- process_id, secret_key = struct.unpack("!2i", data)
- return BackendKeyData(process_id, secret_key)
- createFromData = staticmethod(createFromData)
-
-
-##
-# Message representing a query with no data.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-class NoData(object):
- # Byte1('n') - Identifier.
- # Int32(4) - Message length, including self.
- def createFromData(data):
- return NoData()
- createFromData = staticmethod(createFromData)
-
-
-##
-# Message representing a successful Parse.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-class ParseComplete(object):
- # Byte1('1') - Identifier.
- # Int32(4) - Message length, including self.
- def createFromData(data):
- return ParseComplete()
- createFromData = staticmethod(createFromData)
-
-
-##
-# Message representing a successful Bind.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-class BindComplete(object):
- # Byte1('2') - Identifier.
- # Int32(4) - Message length, including self.
- def createFromData(data):
- return BindComplete()
- createFromData = staticmethod(createFromData)
-
-
-##
-# Message representing a successful Close.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-class CloseComplete(object):
- # Byte1('3') - Identifier.
- # Int32(4) - Message length, including self.
- def createFromData(data):
- return CloseComplete()
- createFromData = staticmethod(createFromData)
-
-
-##
-# Message representing data from an Execute has been received, but more data
-# exists in the portal.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-class PortalSuspended(object):
- # Byte1('s') - Identifier.
- # Int32(4) - Message length, including self.
- def createFromData(data):
- return PortalSuspended()
- createFromData = staticmethod(createFromData)
-
-
-##
-# Message representing the backend is ready to process a new query.
-#
-# Stability: This is an internal class. No stability guarantee is made.
-class ReadyForQuery(object):
- def __init__(self, status):
- self._status = status
-
- ##
- # I = Idle, T = Idle in Transaction, E = idle in failed transaction.
- status = property(lambda self: self._status)
-
- def __repr__(self):
- return "
-# A NoticeResponse instance will have properties containing the data sent
-# from the server:
-#
-# Stability: Added in pg8000 v1.03. Required properties severity, code, and
-# msg are guaranteed for v1.xx. Other properties should be checked with
-# hasattr before accessing.
-class NoticeResponse(object):
- responseKeys = {
- "S": "severity", # always present
- "C": "code", # always present
- "M": "msg", # always present
- "D": "detail",
- "H": "hint",
- "P": "position",
- "p": "_position",
- "q": "_query",
- "W": "where",
- "F": "file",
- "L": "line",
- "R": "routine",
- }
-
- def __init__(self, **kwargs):
- for arg, value in kwargs.items():
- setattr(self, arg, value)
-
- def __repr__(self):
- return "
-# Stability: Added in pg8000 v1.03. Required properties severity, code, and
-# msg are guaranteed for v1.xx. Other properties should be checked with
-# hasattr before accessing.
-class ErrorResponse(object):
- def __init__(self, **kwargs):
- for arg, value in kwargs.items():
- setattr(self, arg, value)
-
- def __repr__(self):
- return "
-# Stability: Added in pg8000 v1.03. When limited to accessing properties from
-# a notification event dispatch, stability is guaranteed for v1.xx.
-class NotificationResponse(object):
- def __init__(self, backend_pid, condition, additional_info):
- self._backend_pid = backend_pid
- self._condition = condition
- self._additional_info = additional_info
-
- ##
- # An integer representing the process ID of the backend that triggered
- # the NOTIFY.
- #
- # Stability: Added in pg8000 v1.03, stability guaranteed for v1.xx.
- backend_pid = property(lambda self: self._backend_pid)
-
- ##
- # The name of the notification fired.
- #
- # Stability: Added in pg8000 v1.03, stability guaranteed for v1.xx.
- condition = property(lambda self: self._condition)
-
- ##
- # Currently unspecified by the PostgreSQL documentation as of v8.3.1.
- #
- # Stability: Added in pg8000 v1.03, stability guaranteed for v1.xx.
- additional_info = property(lambda self: self._additional_info)
-
- def __repr__(self):
- return "
- # Int32(80877103) - The SSL request code.
- def serialize(self):
- return struct.pack("!ii", 8, 80877103)
-
-
-##
-# A StartupMessage message. Begins a DB session, identifying the user to be
-# authenticated as and the database to connect to.
-#
-#
-#