I use twisted.enterprise.adbapi
inside the Twisted .tac
, and I find that the pending object returned for functions like aConnectionPool.runQuery(sqlQuery)
does not start unless reactor.(run)
called. How to add a request to the reactor loop created by twistd
instead of calling reactor.run()
? Is this a general procedure or is it something specific to an asynchronous database API?
change - attach code:
from twisted.application import internet, service from zope.interface import implements from twisted.web.iweb import IBodyProducer from twisted.internet import defer, protocol, reactor from twisted.internet.defer import succeed from twisted.web.client import Agent from twisted.web.http_headers import Headers import json import base64 from twisted.enterprise import adbapi class StringProducer(object): implements(IBodyProducer) def __init__(self, body): self.body = body self.length = len(body) def startProducing(self, consumer): consumer.write(self.body) return succeed(None) def pauseProducing(self): pass def stopProducing(self): pass def httpRequest(url, values, headers={}, method='POST'): agent = Agent(reactor) d = agent.request(method, url, Headers(headers), StringProducer(values) ) def handle_response(response): if response.code == 204: d = defer.succeed('') else: class SimpleReceiver(protocol.Protocol): def __init__(s, d): s.buf = ''; sd = d def dataReceived(s, data): s.buf += data response = json.loads(data) receipt = response[u'receipt'] if receipt[u'product_id'] == "com.domain_name.app_name.a_product_id": transactionID = receipt[u'original_transaction_id'] date = receipt[u'original_purchase_date'] purchaseDate = date.strip(' Etc/GMT') print transactionID print purchaseDate dbpool = adbapi.ConnectionPool('MySQLdb', db='mydb', user='user', passwd='passwd') dOperation = dbpool.runOperation("insert into users(name, original_transaction_id, date_joined) values(%s, %s, %s)", ('testuser', transactionID, purchaseDate)) def finishInsert(dObject, pool): print 'inserted!' pool.close() dOperation.addCallback(finishInsert, dbpool) def insertError(dObject): print 'insert error!' dOperation.addErrback(insertError) def connectionLost(s, reason): sdcallback(s.buf) d = defer.Deferred() response.deliverBody(SimpleReceiver(d)) return d d.addCallback(handle_response) class StoreServer(protocol.Protocol): def dataReceived(self, data): a = data.split(':delimiter:') if a[0] == 'addToUserList': receiptBase64 = base64.standard_b64encode(a[1]) jsonReceipt = json.dumps({'receipt-data':receiptBase64}) httpRequest( "https://buy.itunes.apple.com/verifyReceipt", jsonReceipt, {'Content-Type': ['application/x-www-form-urlencoded']} ) application = service.Application("My Server") storeFactory = protocol.Factory() storeFactory.protocol = StoreServer tcpStoreServer = internet.TCPServer(30000, storeFactory) tcpStoreServer.setServiceParent(application)
source share