2017-03-12 2 views
0

J'utilise python tordu pour obtenir des données en streaming à partir de Twitter streaming api.Il ya deux étapes en bref. 1) get access_token 2) utilisez access_token pour faire une requête pour les données. étape 1 travail complètement bien, mais à l'étape 2 je suis gettin gthis erreur de bad requeststatus 400. pourquoi est-ce? Je pense que c'est parce que Twitter utilise HTTP1.1 et tordu utilise HTTP1.0 par deafult. puis la mise à niveau des connexions à HTTP1.1tordu twitter en streaming api mauvais demande erreur

EDIT: Voici mon message d'erreur

HTTP/1.0 400 Bad Request 
content-length: 0 
date: Sun, 12 Mar 2017 14:57:13 GMT 
server: tsa 
x-connection-hash: dca361a2b4214ad66203e9912b05cf7f 

[Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionDone'>: Connection was closed cleanly. 

.

#!/usr/bin/python 
import oauth2 as oauth 
import urlparse 
import time 
import webbrowser 
from twisted.internet import reactor, protocol, ssl 
from twisted.web import http 


CONSUMER_KEY = 'xxxx' 
CONSUMER_SECRET = 'xxxx' 
CONSUMER = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET) 

ACCESS_TOKEN_FILE = 'OAUTH_ACCESS_TOKEN' 

TWITTER_REQUEST_TOKEN_URL = 'https://twitter.com/oauth/request_token' 
TWITTER_ACCESS_TOKEN_URL = 'https://twitter.com/oauth/access_token' 
TWITTER_AUTHORIZE_URL = 'https://twitter.com/oauth/authorize' 
TWITTER_STREAM_API_HOST = 'stream.twitter.com' 
TWITTER_STREAM_API_PATH = '/1.1/statuses/sample.json' 


class TwitterStreamer(http.HTTPClient): 
    def connectionMade(self): 
     self.sendCommand('GET', self.factory.url) 
     self.sendHeader('Host', self.factory.host) 
     self.sendHeader('User-Agent', self.factory.agent) 
     self.sendHeader('Authorization', self.factory.oauth_header) 
     self.endHeaders() 

    def handleStatus(self, version, status, message): 
     if status != '200': 
      self.factory.tweetError(ValueError("bad status")) 

    def lineReceived(self, line): 
     self.factory.tweetReceived(line) 

    def connectionLost(self, reason): 
     self.factory.tweetError(reason) 


class TwitterStreamerFactory(protocol.ClientFactory): 
    protocol = TwitterStreamer 

    def __init__(self, oauth_header): 
     self.url = TWITTER_STREAM_API_PATH 
     self.agent = 'Twisted/TwitterStreamer' 
     self.host = TWITTER_STREAM_API_HOST 
     self.oauth_header = oauth_header 

    def clientConnectionFailed(self, _, reason): 
     self.tweetError(reason) 

    def tweetReceived(self, tweet): 
     print tweet 

    def tweetError(self, error): 
     print error 


def save_access_token(key, secret): 
    with open(ACCESS_TOKEN_FILE, 'w') as f: 
     f.write("ACCESS_KEY=%s\n" % key) 
     f.write("ACCESS_SECRET=%s\n" % secret) 


def load_access_token(): 
    with open(ACCESS_TOKEN_FILE) as f: 
     lines = f.readlines() 

    str_key = lines[0].strip().split('=')[1] 
    str_secret = lines[1].strip().split('=')[1] 
    return oauth.Token(key=str_key, secret=str_secret) 


def fetch_access_token(): 
    CONSUMER_KEY = 'xxxxxxxx' 
    CONSUMER_SECRET = 'xxxxxxxxx' 
    ACCESS_KEY="xxxxxxx" 
    ACCESS_SECRET="xxxxxxxxx" 
    consumer = oauth.Consumer(key=CONSUMER_KEY, secret=CONSUMER_SECRET) 
    access_token = oauth.Token(key=ACCESS_KEY, secret=ACCESS_SECRET) 

    return (access_token.key, access_token.secret) 


def build_authorization_header(access_token): 
    url = "https://%s%s" % (TWITTER_STREAM_API_HOST, TWITTER_STREAM_API_PATH) 
    params = { 
     'oauth_version': "1.0", 
     'oauth_nonce': oauth.generate_nonce(), 
     'oauth_timestamp': str(int(time.time())), 
     'oauth_token': access_token.key, 
     'oauth_consumer_key': CONSUMER.key 
    } 

    # Sign the request. 
    # For some messed up reason, we need to specify is_form_encoded to prevent 
    # the oauth2 library from setting oauth_body_hash which Twitter doesn't like. 
    req = oauth.Request(method="GET", url=url, parameters=params, is_form_encoded=True) 
    req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), CONSUMER, access_token) 

    # Grab the Authorization header 
    header = req.to_header()['Authorization'].encode('utf-8') 
    print "Authorization header:" 
    print "  header = %s" % header 
    return header 

if __name__ == '__main__': 
    # Check if we have saved an access token before. 
    try: 
     f = open(ACCESS_TOKEN_FILE) 
    except IOError: 
     # No saved access token. Do the 3-legged OAuth dance and fetch one. 
     (access_token_key, access_token_secret) = fetch_access_token() 
     # Save the access token for next time. 
     save_access_token(access_token_key, access_token_secret) 

    # Load access token from disk. 
    access_token = load_access_token() 

    # Build Authorization header from the access_token. 
    auth_header = build_authorization_header(access_token) 

    # Twitter stream using the Authorization header. 
    twsf = TwitterStreamerFactory(auth_header) 
    reactor.connectSSL(TWITTER_STREAM_API_HOST, 443, twsf, ssl.ClientContextFactory()) 
    reactor.run() 

MISE À JOUR: Code de travail:

import base64, urllib 
from twisted.internet import reactor 
from twisted.internet.defer import Deferred 
from twisted.protocols import basic 
from twisted.python.failure import DefaultException 
from twisted.web.client import Agent 
from twisted.web.http_headers import Headers 
import json 
import oauth2 as oauth 
import time 
from twisted.web import server,resource 
from twisted.internet import endpoints 
from twisted.web.server import Site 
CONSUMER_KEY = 'xxxxxxxxxxxx' 
CONSUMER_SECRET = 'xxxxxxxxxxxxxx' 
TWITTER_STREAM_API_HOST = 'stream.twitter.com' 
TWITTER_STREAM_API_PATH = '/1.1/statuses/sample.json' 
ACCESS_TOKEN_FILE = 'OAUTH_ACCESS_TOKEN' 

CONSUMER = oauth.Consumer(CONSUMER_KEY, CONSUMER_SECRET) 


def callback(result): 
    print result 
def errback(error): 
    print error 

class StreamingParser(basic.LineReceiver): 
    delimiter = '\r\n' 

    def __init__(self, user_callback, user_errback): 
     self.user_callback = user_callback 
     self.user_errback = user_errback 

    def lineReceived(self, line): 
     d = Deferred() 
     d.addCallback(self.user_callback) 
     d.addErrback(self.user_errback) 
     line = line.strip() 
     print line,'........' 
     try: 
      d.callback(json.loads(line)) 
     except ValueError, e: 
      if self.user_errback: 
       d.errback(e) 

    def connectionLost(self, reason): 
     if self.user_errback: 
      d = Deferred() 
      d.addErrback(self.user_errback) 
      d.errback(DefaultException(reason.getErrorMessage())) 

def _get_response(response, callback, errback): 
    print 'got response......' 
    response.deliverBody(StreamingParser(callback, errback)) 
    return Deferred() 

def _shutdown(reason, errback): 
    d = Deferred() 
    d.addErrback(errback) 
    d.errback(reason) 
    if reactor.running: 
     reactor.stop() 

def save_access_token(key, secret): 
    with open(ACCESS_TOKEN_FILE, 'w') as f: 
    f.write("ACCESS_KEY=%s\n" % key) 
    f.write("ACCESS_SECRET=%s\n" % secret) 


def load_access_token(): 
    with open(ACCESS_TOKEN_FILE) as f: 
    lines = f.readlines() 

    str_key = lines[0].strip().split('=')[1] 
    str_secret = lines[1].strip().split('=')[1] 
    return oauth.Token(key=str_key, secret=str_secret) 


def fetch_access_token(): 
    ACCESS_KEY="xxxxx-xxxx" 
    ACCESS_SECRET="xxxxxxxxxxxx" 
    access_token = oauth.Token(key=ACCESS_KEY, secret=ACCESS_SECRET) 
    return (access_token.key, access_token.secret) 


def make_header(access_token): 
    url = "https://%s%s" % (TWITTER_STREAM_API_HOST, TWITTER_STREAM_API_PATH) 
    params = { 
    # "Authorization": "Oauth %s" % auth, 
    "oauth_version": "1.0", 
    "oauth_nonce": oauth.generate_nonce(), 
    "oauth_timestamp": str(int(time.time())), 
    "oauth_token": access_token.key, 
    "oauth_consumer_key": CONSUMER.key 
    } 

    req = oauth.Request(method="GET", url=url, parameters=params, is_form_encoded=True) 
    req.sign_request(oauth.SignatureMethod_HMAC_SHA1(), CONSUMER, access_token) 
    header = req.to_header()['Authorization'].encode('utf-8') 
    print "Authorization header:" 
    print "  header = %s" % header 
    return header 

def start_streaming(): 
    print 'streaming started...........' 
    try: 
     f = open(ACCESS_TOKEN_FILE) 
    except IOError: 
     access_token_key, access_token_secret = fetch_access_token() 
     save_access_token(access_token_key, access_token_secret) 

    access_token = load_access_token() 
    auth_header = make_header(access_token) 
    url = 'https://stream.twitter.com/1.1/statuses/sample.json' 
    headers = Headers({ 
     'User-Agent': ['TwistedSTreamReciever'], 
     'Authorization': [auth_header]}) 
    agent = Agent(reactor) 
    d = agent.request('GET', url, headers, None) 
    d.addCallback(_get_response, callback, errback) 
    d.addBoth(_shutdown, errback) 
    # reactor.run() 

class _Stream(resource.Resource): 
    isLeaf = True 
    def render_GET(self, request): 
     start_streaming()# Streaming started here....... 
     time.sleep(8) # wait for 8 seconds... 
     ########.........??? stop streaming here?? 
     return "<html>streaming started...........%s</html>" % (time.ctime(),) 


if __name__ == "__main__": 

    resource = _Stream() 
    factory = Site(resource) 
    endpoint = endpoints.TCP4ServerEndpoint(reactor, 8880) 
    endpoint.listen(factory) 
    reactor.run() 
+2

Pourquoi utilisez 'twisted.web.http.HTTPClient' vous? Utilisez 'Agent' à la place. –

+0

@ Jean-PaulCalderone merci pour la réponse. Je vais essayer de faire usage de l'agent – anekix

+0

Je l'ai obtenu en travaillant avec 'Agent'. J'ai également défini un point de terminaison http pour commencer le streaming. Maintenant, juste un petit indice dont j'ai besoin sur la façon d'arrêter le streaming (peut-être arrêter la connexion striée par 'Agent'). J'ai ajouté mon code mis à jour pour la référence – anekix

Répondre

0

renoncer à la lecture d'une réponse en streaming particulier (ce qui semble peut-être nécessaire - Je suppose que ces flux Twitter ne finissent jamais sur leur propre) et fermez la connexion associée à cette requête/réponse (car HTTP n'a pas d'autre moyen d'abandonner une réponse), utilisez la méthode transport.loseConnection du protocole de remise de corps. Ainsi, par exemple:

def _get_response(response, callback, errback): 
    print 'got response......' 
    proto = StreamingParser(callback, errback) 
    save_stream_by_name(stream_name, proto) 
    response.deliverBody(proto) 
    return Deferred() 

Lorsque vous avez terminé avec ce flux:

pop_stream_by_name(stream_name).transport.loseConnection()