De nos jours, les données se développent et s'accumulent plus rapidement que jamais. Actuellement, environ 90% de toutes les données générées dans notre monde ne l'ont été qu'au cours des deux dernières années. En raison de ce taux de croissance stupéfiant, les plates-formes de Big Data ont dû adopter des solutions radicales afin de maintenir des volumes de données aussi énormes.
L'une des principales sources de données aujourd'hui sont les réseaux sociaux. Permettez-moi de vous présenter un exemple concret: traiter, analyser et extraire des informations à partir des données des réseaux sociaux en temps réel à l'aide de l'une des solutions d'écho de Big Data les plus importantes: Apache Spark et Python.
Dans cet article, je vais vous apprendre à créer une application simple qui lit les flux en ligne de Twitter à l'aide de Python, puis traite les tweets à l'aide d'Apache Spark Streaming pour identifier les hashtags et, enfin, renvoie les hashtags les plus tendances et représente ces données sur un réel tableau de bord -time.
Pour recevoir des tweets de Twitter, vous devez vous inscrire sur TwitterApps en cliquant sur «Créer une nouvelle application», puis remplissez le formulaire ci-dessous, cliquez sur «Créez votre application Twitter».
Deuxièmement, accédez à votre application nouvellement créée et ouvrez l'onglet «Clés et jetons d'accès». Cliquez ensuite sur 'Générer mon jeton d'accès'.
Vos nouveaux jetons d'accès apparaîtront comme ci-dessous.
Et maintenant, vous êtes prêt pour la prochaine étape.
calculateur corp à corp vs w2
Dans cette étape, je vais vous montrer comment créer un client simple qui récupérera les tweets de l'API Twitter à l'aide de Python et les transmettra à l'instance Spark Streaming. Cela devrait être facile à suivre pour tout professionnel Développeur Python .
plan comptable des finances personnelles
Commençons par créer un fichier appelé twitter_app.py
puis nous y ajouterons le code comme ci-dessous.
Importez les bibliothèques que nous utiliserons comme ci-dessous:
import socket import sys import requests import requests_oauthlib import json
Et ajoutez les variables qui seront utilisées dans OAuth pour la connexion à Twitter comme ci-dessous:
# Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
Maintenant, nous allons créer une nouvelle fonction appelée get_tweets
qui appellera l'URL de l'API Twitter et renverra la réponse pour un flux de tweets.
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
Ensuite, créez une fonction qui prend la réponse de celle ci-dessus et extrait le texte des tweets de tout l'objet JSON des tweets. Après cela, il envoie chaque tweet à l'instance Spark Streaming (sera discuté plus tard) via une connexion TCP.
def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print('Tweet Text: ' + tweet_text) print ('------------------------------------------') tcp_connection.send(tweet_text + '
') except: e = sys.exc_info()[0] print('Error: %s' % e)
Maintenant, nous allons créer la partie principale qui établira les connexions de socket hôte de l'application avec lesquelles Spark se connectera. Nous allons configurer l'adresse IP ici pour être localhost
car tous fonctionneront sur la même machine et le port 9009
. Ensuite, nous appellerons le get_tweets
méthode, que nous avons faite ci-dessus, pour obtenir les tweets de Twitter et transmettre sa réponse avec la connexion socket à send_tweets_to_spark
pour avoir envoyé les tweets à Spark.
TCP_IP = 'localhost' TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print('Waiting for TCP connection...') conn, addr = s.accept() print('Connected... Starting getting tweets.') resp = get_tweets() send_tweets_to_spark(resp, conn)
Créons notre application de streaming Spark qui traitera en temps réel les tweets entrants, en extraira les hashtags et calculera le nombre de hashtags mentionnés.
Tout d'abord, nous devons créer une instance de Spark Context sc
, puis nous avons créé le Streaming Context ssc
de sc
avec un intervalle de lots de deux secondes qui effectuera la transformation sur tous les flux reçus toutes les deux secondes. Notez que nous avons défini le niveau de journalisation sur ERROR
afin de désactiver la plupart des journaux écrits par Spark.
Nous avons défini ici un point de contrôle afin de permettre des points de contrôle RDD périodiques; il est obligatoire de l'utiliser dans notre application, car nous utiliserons des transformations avec état (nous en parlerons plus tard dans la même section).
Ensuite, nous définissons notre DStream dataStream principal qui se connectera au serveur socket que nous avons créé auparavant sur le port 9009
et lisez les tweets de ce port. Chaque enregistrement du DStream sera un tweet.
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName('TwitterStreamApp') # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint('checkpoint_TwitterApp') # read data from port 9009 dataStream = ssc.socketTextStream('localhost',9009)
Nous allons maintenant définir notre logique de transformation. Nous allons d'abord diviser tous les tweets en mots et les mettre en mots RDD. Ensuite, nous filtrerons uniquement les hashtags de tous les mots et les mapperons sur une paire de (hashtag, 1)
et mettez-les dans les hashtags RDD.
tous les éléments suivants sont des principes de conception dans l'art, sauf lesquels ?
Ensuite, nous devons calculer combien de fois le hashtag a été mentionné. Nous pouvons le faire en utilisant la fonction reduceByKey
. Cette fonction calculera combien de fois le hashtag a été mentionné pour chaque lot, c'est-à-dire qu'elle réinitialisera les décomptes dans chaque lot.
Dans notre cas, nous devons calculer les décomptes pour tous les lots, nous allons donc utiliser une autre fonction appelée updateStateByKey
, car cette fonction vous permet de maintenir l'état de RDD tout en le mettant à jour avec de nouvelles données. Cette méthode s'appelle Stateful Transformation
.
Notez que pour utiliser updateStateByKey
, vous devez configurer un point de contrôle, et c'est ce que nous avons fait à l'étape précédente.
# split each tweet into words words = dataStream.flatMap(lambda line: line.split(' ')) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()
Le updateStateByKey
prend une fonction comme paramètre appelé update
fonction. Il s'exécute sur chaque élément de RDD et fait la logique souhaitée.
Dans notre cas, nous avons créé une fonction de mise à jour appelée aggregate_tags_count
qui résumera tous les new_values
pour chaque hashtag et ajoutez-les au total_sum
c'est la somme de tous les lots et enregistrez les données dans tags_totals
RDD.
def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
Ensuite, nous traitons tags_totals
RDD dans chaque lot afin de le convertir en table temporaire à l'aide de Spark SQL Context, puis effectuer une instruction select afin de récupérer les dix premiers hashtags avec leurs nombres et les mettre dans hashtag_counts_df
trame de données.
def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print('----------- %s -----------' % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable('hashtags') # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql('select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10') hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print('Error: %s' % e)
La dernière étape de notre application Spark consiste à envoyer le hashtag_counts_df
trame de données à l'application de tableau de bord. Nous allons donc convertir le bloc de données en deux tableaux, l'un pour les hashtags et l'autre pour leurs décomptes. Ensuite, nous les enverrons à l'application de tableau de bord via l'API REST.
def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select('hashtag').collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select('hashtag_count').collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
Enfin, voici un exemple de sortie de Spark Streaming lors de l'exécution et de l'impression de hashtag_counts_df
, vous remarquerez que la sortie est imprimée exactement toutes les deux secondes selon les intervalles de lots.
Nous allons maintenant créer une application de tableau de bord simple qui sera mise à jour en temps réel par Spark. Nous allons le créer en utilisant Python, Flask et Charts.js .
Commençons par créer un projet Python avec la structure ci-dessous, puis téléchargez et ajoutez le Chart.js fichier dans le répertoire statique.
Ensuite, dans le app.py
fichier, nous allons créer une fonction appelée update_data
, qui sera appelée par Spark via l'URL http://localhost:5001/updateData
afin de mettre à jour les étiquettes globales et les tableaux de valeurs.
De plus, la fonction refresh_graph_data
est créé pour être appelé par une requête AJAX pour renvoyer les nouveaux tableaux d'étiquettes et de valeurs mis à jour au format JSON. La fonction get_chart_page
rendra le chart.html
page lors de l'appel.
from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route('/') def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print('labels now: ' + str(labels)) print('data now: ' + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return 'error',400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print('labels received: ' + str(labels)) print('data received: ' + str(values)) return 'success',201 if __name__ == '__main__': app.run(host='localhost', port=5001)
Maintenant, créons un graphique simple dans le chart.html
fichier afin d'afficher les données de hashtag et de les mettre à jour en temps réel. Comme défini ci-dessous, nous devons importer le Chart.js
et jquery.min.js
Bibliothèques JavaScript.
Dans la balise body, nous devons créer un canevas et lui donner un identifiant afin de le référencer lors de l'affichage du graphique à l'aide de JavaScript à l'étape suivante.
Top Trending Twitter Hashtags Top Trending Twitter Hashtags
Maintenant, construisons le graphique à l'aide du code JavaScript ci-dessous. Tout d'abord, nous obtenons l'élément canvas, puis nous créons un nouvel objet de graphique et lui passons l'élément canvas et définissons son objet de données comme ci-dessous.
Notez que les étiquettes et les données des données sont délimitées par des étiquettes et des variables de valeurs qui sont renvoyées lors du rendu de la page lors de l'appel d'un get_chart_page
fonction dans le app.py
fichier.
La dernière partie restante est la fonction qui est configurée pour faire une requête Ajax toutes les secondes et appeler l'URL /refreshData
, qui exécutera refresh_graph_data
dans app.py
et renvoyez les nouvelles données mises à jour, puis mettez à jour le caractère qui restitue les nouvelles données.
comment les principes et éléments de conception visuelle sont-ils utilisés
var ctx = document.getElementById('chart'); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} '{{item}}', {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000);
Exécutons les trois applications dans l'ordre ci-dessous: 1. Client d'application Twitter. 2. Spark App. 3. Application Web de tableau de bord.
Ensuite, vous pouvez accéder au tableau de bord en temps réel à l'aide de l'URL
Maintenant, vous pouvez voir votre graphique en cours de mise à jour, comme ci-dessous:
Nous avons appris à effectuer des analyses de données simples sur les données en temps réel à l'aide de Spark Streaming et à les intégrer directement à un tableau de bord simple à l'aide d'un service Web RESTful. À partir de cet exemple, nous pouvons voir à quel point Spark est puissant, car il capture un flux massif de données, le transforme et extrait des informations précieuses qui peuvent être utilisées facilement pour prendre des décisions en un rien de temps. Il existe de nombreux cas d'utilisation utiles qui peuvent être mis en œuvre et qui peuvent servir différents secteurs, comme les actualités ou le marketing.
Exemple de l'industrie de l'information
Nous pouvons suivre les hashtags les plus fréquemment mentionnés pour savoir de quels sujets les gens parlent le plus sur les réseaux sociaux. En outre, nous pouvons suivre des hashtags spécifiques et leurs tweets afin de savoir ce que les gens disent sur des sujets ou des événements spécifiques dans le monde.
Exemple marketing
Nous pouvons collecter le flux de tweets et, en effectuant une analyse des sentiments, les catégoriser et déterminer les intérêts des personnes afin de les cibler avec des offres liées à leurs intérêts.
En outre, il existe de nombreux cas d'utilisation qui peuvent être appliqués spécifiquement pour l'analyse de données volumineuses et peuvent servir de nombreux secteurs. Pour plus de cas d'utilisation d'Apache Spark en général, je vous suggère de consulter l'un de nos postes précédents .
exemple de traitement parallèle par lots de printemps
Je vous encourage à en savoir plus sur Spark Streaming sur Ici afin d'en savoir plus sur ses capacités et d'effectuer une transformation plus avancée des données pour plus d'informations en temps réel en l'utilisant.
Il effectue un traitement rapide des données, une diffusion en continu et un apprentissage automatique à très grande échelle.
Il peut être utilisé dans la transformation de données, l'analyse prédictive et la détection de fraude sur les plateformes Big Data.
Twitter vous permet d'obtenir ses données en utilisant leurs API; l'un des moyens qu'ils mettent à disposition est de diffuser les tweets en temps réel sur des critères de recherche que vous définissez.