# ODBTPAPI - A Python interface to the ODBTP library. # Copyright (C) 2004 Benji York # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Contact the author: benji@benjiyork.com import sys, struct, datetime, exceptions, time from ctypes import * odbtp = cdll.LoadLibrary('./libodbtp.so') # Constants from ODBTP that describe the data types. ODB_BINARY = -2 ODB_BIGINT = -25 ODB_UBIGINT = -27 ODB_BIT = -7 ODB_CHAR = 1 ODB_DATE = 91 ODB_DATETIME = 93 ODB_DOUBLE = 8 ODB_GUID = -11 ODB_INT = -16 ODB_UINT = -18 ODB_NUMERIC = 2 ODB_REAL = 7 ODB_SMALLINT = -15 ODB_USMALLINT = -17 ODB_TIME = 92 ODB_TINYINT = -26 ODB_UTINYINT = -28 ODB_WCHAR = -8 class TypeObject: """Instances of this compare equal to any item from the parameter list.""" def __init__(self,*values): self.values = values def __cmp__(self,other): if other in self.values: return 0 if other < self.values: return 1 else: return -1 STRING = TypeObject(ODB_CHAR, ODB_WCHAR) BINARY = TypeObject(ODB_BINARY) NUMBER = TypeObject(ODB_BIGINT, ODB_UBIGINT, ODB_BIT, ODB_DOUBLE, ODB_INT, ODB_UINT, ODB_NUMERIC, ODB_REAL, ODB_SMALLINT, ODB_USMALLINT, ODB_TINYINT, ODB_UTINYINT) DATETIME = TypeObject(ODB_DATE, ODB_DATETIME, ODB_TIME) ROWID = ODB_GUID # XXX Is this right? # TODO: Implement these (some have been started). def Date(year, month, day): raise NotImplementedError def Time(hour, minute, second): raise NotImplementedError def DateFromTicks(ticks): raise NotImplementedError return apply(Date,time.localtime(ticks)[:3]) def TimeFromTicks(ticks): raise NotImplementedError return apply(Time,time.localtime(ticks)[3:6]) def TimestampFromTicks(ticks): raise NotImplementedError return apply(Timestamp,time.localtime(ticks)[:6]) def Binary(string): raise NotImplementedError def stringFromPointer(pointerIn, length=None): myPointer = pointer(c_char.from_address(pointerIn)) index = 0 chars = [] while True: char = myPointer[index] index += 1 if length == None and char == '\x00': break chars.append(char) if length != None and index == length: break return ''.join(chars) def longFromPointer(pointerIn): return c_long.from_address(pointerIn).value def unsignedLongFromPointer(pointerIn): return c_ulong.from_address(pointerIn).value def longLongFromPointer(pointerIn): return c_longlong.from_address(pointerIn).value def unsignedLongLongFromPointer(pointerIn): return c_ulonglong.from_address(pointerIn).value def byteFromPointer(pointerIn): return c_byte.from_address(pointerIn).value def unsignedByteFromPointer(pointerIn): return c_byte.from_address(pointerIn).value def boolFromPointer(pointerIn): return bool(c_ubyte.from_address(pointerIn).value) def shortFromPointer(pointerIn): return c_short.from_address(pointerIn).value def unsignedShortFromPointer(pointerIn): return c_ushort.from_address(pointerIn).value def floatFromPointer(pointerIn): return c_float.from_address(pointerIn).value def doubleFromPointer(pointerIn): return c_double.from_address(pointerIn).value def dateTimeFromPointer(pointerIn): rawString = stringFromPointer(pointerIn, length=16) # Unpack structure, ignoring fractional seconds. (TODO: fix that) fields = struct.unpack('H5hxxxx', rawString) return datetime.datetime(*fields) convertions = { ODB_CHAR: stringFromPointer, ODB_INT: longFromPointer, ODB_UINT: unsignedLongFromPointer, ODB_BIGINT: longLongFromPointer, ODB_UBIGINT: unsignedLongLongFromPointer, ODB_BIT: boolFromPointer, ODB_SMALLINT: shortFromPointer, ODB_USMALLINT: unsignedShortFromPointer, ODB_TINYINT: byteFromPointer, ODB_UTINYINT: unsignedByteFromPointer, ODB_REAL: floatFromPointer, ODB_DOUBLE: doubleFromPointer, ODB_DATETIME: dateTimeFromPointer, } def getErrorText(item): return stringFromPointer(odbtp.odbGetErrorText(item)) # XXX Do I really want the ability to set a default? _defaultServer = '127.0.0.1' def setDefaultServer(server): global _defaultServer _defaultServer = server def getDefaultServer(): return _defaultServer class Connection: def __init__(self, connectString, server=None, port=2799): if server == None: server = _defaultServer self._connection = odbtp.odbAllocate(None) if not self._connection: raise Error('Error allocating connection.') if not odbtp.odbLogin(self._connection, server, port, 0, connectString): self._raiseError() def close(self): odbtp.odbLogout(self._connection, True) odbtp.odbFree(self._connection) def commit(self): if not odbtp.odbCommit(self._connection): self._raiseError() def rollback(self): if not odbtp.odbRollback(self._connection): self._raiseError() def cursor(self): return Cursor(self._connection) def _raiseError(self): raise Error(getErrorText(self._connection)) class Cursor: def __init__(self, connection): self._connection = connection self._query = odbtp.odbAllocate(self._connection) if not self._query: raise Error(getErrorText(self._query)) self.rowcount = -1 self.arraysize = 1 self.description = None # TODO: Populate this after execution. self._closed = False ## TODO: ## def callproc(self, procname): def close(self): """Close the cursor.""" # TODO: perhaps some resources should be freed here? self._closed = True def _checkClosed(self): """See if the cursor has been closed, and raise an exception if so.""" if self._closed: raise Error('Cursor has been closed.') def _setParam(self, number, value, isLast): """Select the appropriate way to set the parameter based on it's type.""" if isinstance(value, float): if not odbtp.odbSetParamDouble(self._query, number, value, isLast): self._raiseError() elif isinstance(value, int): if not odbtp.odbSetParamShort(self._query, number, value, isLast): self._raiseError() elif isinstance(value, long): if not odbtp.odbSetParamLongLong(self._query, number, c_longlong(value), isLast): self._raiseError() elif isinstance(value, basestring): if not odbtp.odbSetParamText(self._query, number, value, isLast): self._raiseError() elif value == None: if not odbtp.odbSetParamNull(self._query, number, isLast): self._raiseError() else: raise Error('Unknown data type for parameter.') def _updateDescription(self): odbtp.odbColName.restype = c_char_p numColumns = odbtp.odbGetTotalCols(self._query) description = [] for column in range(1, numColumns+1): description.append([]) description[-1].append(odbtp.odbColName(self._query, column)) description[-1].append(odbtp.odbColDataType(self._query, column)) description[-1].extend([None, None, None, None, None]) self.description = description def execute(self, operation, parameters=None): """Execute an operation with the provided parameters (if any).""" if parameters == None: parameters = [] self.executemany(operation, [parameters]) def executemany(self, operation, seq_of_parameters): """Execute an operation with the provided list of parameters.""" self._checkClosed() if not odbtp.odbPrepare(self._query, operation): self._raiseError() for parameters in seq_of_parameters: # This is kinda strange, but it's because all the parameters # must be bound before they are set, so I pre-generate and store # all of the values each loop will need. arguments = [] for index, value in enumerate(parameters): number = index + 1 isLast = (number == len(parameters)) arguments.append( (number, value, isLast) ) # Now use the pre-generated values to bind the parameters. for number, value, isLast in arguments: if not odbtp.odbBindInputParam(self._query,number,0,0,isLast): self._raiseError() # And now set the parameter values. for number, value, isLast in arguments: self._setParam(number, value, isLast) # Execute the operation with the bound parameters. if not odbtp.odbExecute(self._query, None): self._raiseError() self._updateDescription() def fetchone(self): """Retrieve a single record from the previous execution (if available).""" result = self.fetchmany(1) if result == []: return None else: return result[0] def fetchmany(self, size=None): """Retrieve several records from the previous execution (if available).""" self._checkClosed() numColumns = odbtp.odbGetTotalCols(self._query) if size == None: size = self.arraysize goodFetch = True rows = [] while len(rows) < size: goodFetch = odbtp.odbFetchRow(self._query) if not goodFetch: self._raiseError() if odbtp.odbNoData(self._query): break row = [] for column in range(1, numColumns+1): if odbtp.odbColTruncated(self._query, column): raise Warning('Column %d was truncated. ' 'Actual size is %d.' % (column, odbtp.odbColActualLen(self._query, column))) data = odbtp.odbColData(self._query, column) dataType = odbtp.odbColDataType(self._query, column) if convertions.has_key(dataType): value = convertions[dataType](data) elif dataType == ODB_BINARY: value = stringFromPointer(data, length=odbtp.odbColDataLen(self._query, column)) else: raise Error('Unknown data type.') row.append(value) rows.append(tuple(row)) return rows def fetchall(self): """Retrieve all records from the previous execution.""" rows = [] while True: # Default to fetching "arraysize" records at a time... size = self.arraysize # ...unless it is set to a very small value. if size == 1: size = 20 moreRows = self.fetchmany(size) rows.extend(moreRows) # If all the rows have been retrieved, stop. if len(moreRows) < size: break return rows def nextset(self): """Switch to the next result set, if there is one.""" self._checkClosed() if not odbtp.odbFetchNextResult(self._query): self._raiseError() def setinputsizes(self, sizes): """Ignored.""" pass def setoutputsize(self, size, column=None): """Ignored.""" pass def _raiseError(self): raise Error(getErrorText(self._query)) # TODO: actually use all of these class Error(exceptions.StandardError): pass class Warning(exceptions.StandardError): pass class InterfaceError(Error): pass class DatabaseError(Error): pass class InternalError(DatabaseError): pass class OperationalError(DatabaseError): pass class ProgrammingError(DatabaseError): pass class IntegrityError(DatabaseError): pass class DataError(DatabaseError): pass class NotSupportedError(DatabaseError): pass odbtp.odbWinsockStartup() connection = Connection apilevel = '2.0' threadsafety = 1 paramstyle = 'qmark'