Old question but adding some useful Python recipe links
According to cx_Oracle documentation:
This read and write attribute indicates the invocation method for each row that is retrieved from the database. Usually a tuple is returned for each row, but if this attribute is set, the method is called using a tuple, which is usually returned, and the result of the method is returned instead .
cx_Oracle - Python interface for Oracle database. Also points to the GitHub repository for a lot of useful examples . Check out GenericRowFactory.py .
Googled: this PPT might be useful: [PDF] CON6543 Python and Oracle Database - RainFocus
Recipe
The Django database backend for Oracle under the hood uses cx_Oracle. In earlier versions (Django 1.11-) they wrote _rowfactory(cursor, row) It also leads the cx_Oracle numeric data types to the corresponding Python data and strings in unicode.
If you installed Django, check base.py as follows:
$ DJANGO_DIR="$(python -c 'import django, os; print(os.path.dirname(django.__file__))')" $ vim $DJANGO_DIR/db/backends/oracle/base.py
You can take _rowfactory() from $DJANGO_DIR/db/backends/oracle/base.py and use the naming decorator below to return namedtuple instead of just tuple .
mybase.py
import functools from itertools import izip, imap from operator import itemgetter from collections import namedtuple import cx_Oracle as Database import decimal def naming(rename=False, case=None): def decorator(rowfactory): @functools.wraps(rowfactory) def decorated_rowfactory(cursor, row, typename="GenericRow"): field_names = imap(case, imap(itemgetter(0), cursor.description)) return namedtuple(typename, field_names)._make(rowfactory(cursor, row)) return decorated_rowfactory return decorator
use it like:
@naming(rename=False, case=str.lower) def rowfactory(cursor, row): casted = [] .... .... return tuple(casted)
oracle.py
import cx_Oracle as Database from cx_Oracle import * import mybase class Cursor(Database.Cursor): def execute(self, statement, args=None): prepareNested = (statement is not None and self.statement != statement) result = super(self.__class__, self).execute(statement, args or []) if prepareNested: if self.description: self.rowfactory = lambda *row: mybase.rowfactory(self, row) return result def close(self): try: super(self.__class__, self).close() except Database.InterfaceError: "already closed" class Connection(Database.Connection): def cursor(self): Cursor(self) connect = Connection
Now, instead of importing cx_oracle, import oracle into a user script as:
user.py
import oracle dsn = oracle.makedsn('HOSTNAME', 1521, service_name='dev_server') db = connect('username', 'password', dsn) cursor = db.cursor() cursor.execute(""" SELECT 'Grijesh' as FirstName, 'Chauhan' as LastName, CAST('10560.254' AS NUMBER(10, 2)) as Salary FROM DUAL """) row = cursor.fetchone() print ("First Name is %s" % row.firstname) # => Grijesh print ("Last Name is %s" % row.lastname) # => Chauhan print ("Salary is %r" % row.salary) # => Decimal('10560.25')
Give it a try!