Je veux utiliser le nez pour tester une application que j'écris en utilisant twisted et txmongo. Je ne peux même pas les cas simples d'utilisation comme le travail suivant: du réacteur nose.twistedtools importation, différé, threaded_reactor enregistrement des importations de reporter l'importation twisted.internet importation txmongoUtilisation du nez pour tester le code dépendant de txmongo
log = logging.getLogger("common.test.test_db")
conn = txmongo.lazyMongoConnectionPool('localhost', 27017, 4)
@deferred()
def test_mongo():
tdb = conn.test
@defer.inlineCallbacks
def cb(oid):
assert oid
obj = yield tdb.test.find({"_id":oid})
log.error("In callback")
assert obj
d = tdb.test.save({"s":1, "b":2})
d.addCallback(cb)
return d
Cependant, ce retour toujours les éléments suivants:
E
======================================================================
ERROR: common.test.test_db.test_mongo
----------------------------------------------------------------------
Traceback (most recent call last):
File "/Volumes/Users/jce/.pyenv/celery/lib/python2.6/site-packages/nose/case.py", line 186, in runTest
self.test(*self.arg)
File "/Volumes/Users/jce/.pyenv/celery/lib/python2.6/site-packages/nose/twistedtools.py", line 138, in errback
failure.raiseException()
File "/Volumes/Users/jce/.pyenv/celery/lib/python2.6/site-packages/twisted/python/failure.py", line 326, in raiseException
raise self.type, self.value, self.tb
RuntimeWarning: not connected
----------------------------------------------------------------------
Ran 1 test in 0.006s
FAILED (errors=1)
J'ai essayé d'ajouter manuellement un threaded_reactor() appel, mais il n'a pas aidé.
modifier
J'ai enlevé les connexions « paresseux », et modifié le code, et maintenant il fonctionne ... Je suis toujours curieux de savoir pourquoi ne fonctionne pas « paresseux ». Le code de travail est comme suit:
dbconn = txmongo.MongoConnectionPool('localhost', 27017, 4)
@deferred()
def test_mongo():
@defer.inlineCallbacks
def cb(conn):
tdb = conn.test
oid = yield tdb.test.save({"s":1, "b":2})
assert oid
log.error(str(oid))
obj = yield tdb.test.find({"_id":oid})
assert obj
log.error(str(obj))
dbconn.addCallback(cb)
return dbconn