ปัจจุบันข้อมูลมีการเติบโตและสะสมเร็วขึ้นกว่าเดิม ปัจจุบันประมาณ 90% ของข้อมูลทั้งหมดที่สร้างขึ้นในโลกของเราถูกสร้างขึ้นในช่วงสองปีที่ผ่านมาเท่านั้น เนื่องจากอัตราการเติบโตที่สูงขึ้นนี้แพลตฟอร์มข้อมูลขนาดใหญ่จึงต้องใช้โซลูชันที่รุนแรงเพื่อรักษาข้อมูลจำนวนมหาศาลไว้
หนึ่งในแหล่งข้อมูลหลักในปัจจุบันคือเครือข่ายสังคม ให้ฉันสาธิตตัวอย่างในชีวิตจริง: จัดการวิเคราะห์และดึงข้อมูลเชิงลึกจากข้อมูลโซเชียลเน็ตเวิร์กแบบเรียลไทม์โดยใช้โซลูชันการสะท้อนข้อมูลขนาดใหญ่ที่สำคัญที่สุดอย่างหนึ่งนั่นคือ Apache Spark และ Python
ในบทความนี้ฉันจะสอนวิธีสร้างแอปพลิเคชั่นง่ายๆที่อ่านสตรีมออนไลน์จาก Twitter โดยใช้ Python จากนั้นประมวลผลทวีตโดยใช้ Apache Spark Streaming เพื่อระบุแฮชแท็กและสุดท้ายส่งกลับแฮชแท็กที่กำลังมาแรงและแสดงข้อมูลนี้ให้เป็นจริง - แผงควบคุมเวลา
คุณต้องลงทะเบียนเพื่อรับทวีตจาก Twitter TwitterApps โดยคลิกที่ 'สร้างแอปใหม่' จากนั้นกรอกแบบฟอร์มด้านล่างคลิกที่ 'สร้างแอป Twitter ของคุณ'
อย่างที่สองไปที่แอพที่คุณสร้างขึ้นใหม่และเปิดแท็บ“ คีย์และโทเค็นการเข้าถึง” จากนั้นคลิกที่“ สร้างโทเค็นการเข้าถึงของฉัน”
โทเค็นการเข้าถึงใหม่ของคุณจะปรากฏดังต่อไปนี้
และตอนนี้คุณก็พร้อมสำหรับขั้นตอนต่อไป
ในขั้นตอนนี้ฉันจะแสดงวิธีสร้างไคลเอนต์ง่ายๆที่จะรับทวีตจาก Twitter API โดยใช้ Python และส่งต่อไปยังอินสแตนซ์ Spark Streaming ควรทำตามได้ง่ายสำหรับมืออาชีพ นักพัฒนา Python .
ก่อนอื่นมาสร้างไฟล์ชื่อ twitter_app.py
จากนั้นเราจะเพิ่มรหัสเข้าด้วยกันด้านล่าง
นำเข้าไลบรารีที่เราจะใช้ดังต่อไปนี้:
import socket import sys import requests import requests_oauthlib import json
และเพิ่มตัวแปรที่จะใช้ใน OAuth สำหรับเชื่อมต่อกับ Twitter ดังนี้:
# Replace the values below with yours ACCESS_TOKEN = 'YOUR_ACCESS_TOKEN' ACCESS_SECRET = 'YOUR_ACCESS_SECRET' CONSUMER_KEY = 'YOUR_CONSUMER_KEY' CONSUMER_SECRET = 'YOUR_CONSUMER_SECRET' my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)
ตอนนี้เราจะสร้างฟังก์ชันใหม่ชื่อ get_tweets
ซึ่งจะเรียก URL API ของ Twitter และส่งคืนการตอบสนองสำหรับสตรีมทวีต
def get_tweets(): url = 'https://stream.twitter.com/1.1/statuses/filter.json' query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','#')] query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data]) response = requests.get(query_url, auth=my_auth, stream=True) print(query_url, response) return response
จากนั้นสร้างฟังก์ชันที่รับการตอบสนองจากฟังก์ชันด้านบนและแยกข้อความของทวีตออกจากออบเจ็กต์ JSON ของทวีตทั้งหมด หลังจากนั้นจะส่งทวีตทั้งหมดไปยังอินสแตนซ์ Spark Streaming (จะกล่าวถึงในภายหลัง) ผ่านการเชื่อมต่อ TCP
def send_tweets_to_spark(http_resp, tcp_connection): for line in http_resp.iter_lines(): try: full_tweet = json.loads(line) tweet_text = full_tweet['text'] print('Tweet Text: ' + tweet_text) print ('------------------------------------------') tcp_connection.send(tweet_text + '
') except: e = sys.exc_info()[0] print('Error: %s' % e)
ตอนนี้เราจะสร้างส่วนหลักซึ่งจะทำให้การเชื่อมต่อซ็อกเก็ตโฮสต์ของแอปที่จุดประกายจะเชื่อมต่อด้วย เราจะกำหนดค่า IP ที่นี่ให้เป็น localhost
เนื่องจากทั้งหมดจะทำงานบนเครื่องเดียวกันและพอร์ต 9009
. จากนั้นเราจะเรียก get_tweets
วิธีการที่เราทำไว้ข้างต้นเพื่อรับทวีตจาก Twitter และส่งการตอบกลับพร้อมกับการเชื่อมต่อซ็อกเก็ตไปยัง send_tweets_to_spark
สำหรับการส่งทวีตไปยัง Spark
TCP_IP = 'localhost' TCP_PORT = 9009 conn = None s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind((TCP_IP, TCP_PORT)) s.listen(1) print('Waiting for TCP connection...') conn, addr = s.accept() print('Connected... Starting getting tweets.') resp = get_tweets() send_tweets_to_spark(resp, conn)
มาสร้างแอปสตรีมมิ่ง Spark ของเราที่จะประมวลผลแบบเรียลไทม์สำหรับทวีตที่เข้ามาแยกแฮชแท็กออกจากแอปและคำนวณจำนวนแฮชแท็กที่ถูกกล่าวถึง
วิธีการสร้างต้นแบบ
ก่อนอื่นเราต้องสร้างอินสแตนซ์ของ Spark Context sc
จากนั้นเราจึงสร้างบริบทการสตรีม ssc
จาก sc
ด้วยช่วงแบตช์สองวินาทีที่จะทำการเปลี่ยนแปลงในสตรีมทั้งหมดที่ได้รับทุกๆสองวินาที สังเกตว่าเราได้ตั้งค่าระดับการบันทึกเป็น ERROR
เพื่อปิดใช้งานบันทึกส่วนใหญ่ที่ Spark เขียน
เรากำหนดจุดตรวจที่นี่เพื่ออนุญาตให้มีการตรวจสอบ RDD เป็นระยะ สิ่งนี้จำเป็นต้องใช้ในแอปของเราเนื่องจากเราจะใช้การแปลงสภาพ (จะกล่าวถึงในหัวข้อเดียวกันในภายหลัง)
จากนั้นเรากำหนด DStream dataStream หลักของเราที่จะเชื่อมต่อกับเซิร์ฟเวอร์ซ็อกเก็ตที่เราสร้างขึ้นก่อนที่พอร์ต 9009
และอ่านทวีตจากพอร์ตนั้น แต่ละระเบียนใน DStream จะเป็นทวีต
from pyspark import SparkConf,SparkContext from pyspark.streaming import StreamingContext from pyspark.sql import Row,SQLContext import sys import requests # create spark configuration conf = SparkConf() conf.setAppName('TwitterStreamApp') # create spark context with the above configuration sc = SparkContext(conf=conf) sc.setLogLevel('ERROR') # create the Streaming Context from the above spark context with interval size 2 seconds ssc = StreamingContext(sc, 2) # setting a checkpoint to allow RDD recovery ssc.checkpoint('checkpoint_TwitterApp') # read data from port 9009 dataStream = ssc.socketTextStream('localhost',9009)
ตอนนี้เราจะกำหนดตรรกะการเปลี่ยนแปลงของเรา ก่อนอื่นเราจะแยกทวีตทั้งหมดออกเป็นคำ ๆ และใส่เป็นคำ RDD จากนั้นเราจะกรองเฉพาะแฮชแท็กจากทุกคำและจับคู่กับ (hashtag, 1)
และใส่แฮชแท็ก RDD
จากนั้นเราต้องคำนวณจำนวนครั้งที่แฮชแท็กถูกพูดถึง เราสามารถทำได้โดยใช้ฟังก์ชัน reduceByKey
ฟังก์ชันนี้จะคำนวณจำนวนครั้งที่แฮชแท็กถูกกล่าวถึงต่อแต่ละชุดนั่นคือจะรีเซ็ตจำนวนในแต่ละชุด
ในกรณีของเราเราจำเป็นต้องคำนวณจำนวนในแบทช์ทั้งหมดดังนั้นเราจะใช้ฟังก์ชันอื่นที่เรียกว่า updateStateByKey
เนื่องจากฟังก์ชันนี้ช่วยให้คุณสามารถรักษาสถานะของ RDD ในขณะที่อัปเดตด้วยข้อมูลใหม่ ทางนี้เรียกว่า Stateful Transformation
.
โปรดทราบว่าในการใช้ updateStateByKey
คุณต้องกำหนดค่าจุดตรวจและสิ่งที่เราได้ทำในขั้นตอนก่อนหน้านี้
# split each tweet into words words = dataStream.flatMap(lambda line: line.split(' ')) # filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1) hashtags = words.filter(lambda w: '#' in w).map(lambda x: (x, 1)) # adding the count of each hashtag to its last count tags_totals = hashtags.updateStateByKey(aggregate_tags_count) # do processing for each RDD generated in each interval tags_totals.foreachRDD(process_rdd) # start the streaming computation ssc.start() # wait for the streaming to finish ssc.awaitTermination()
updateStateByKey
รับฟังก์ชันเป็นพารามิเตอร์ที่เรียกว่า update
ฟังก์ชัน มันทำงานในแต่ละรายการใน RDD และทำตามตรรกะที่ต้องการ
ในกรณีของเราเราได้สร้างฟังก์ชันการอัปเดตชื่อ aggregate_tags_count
ซึ่งจะรวมทั้งหมด new_values
สำหรับแฮชแท็กแต่ละรายการและเพิ่มลงในแท็ก total_sum
นั่นคือผลรวมของแบทช์ทั้งหมดและบันทึกข้อมูลลงใน tags_totals
RDD.
def aggregate_tags_count(new_values, total_sum): return sum(new_values) + (total_sum or 0)
จากนั้นเราจะทำการประมวลผลบน tags_totals
RDD ในทุกชุดเพื่อแปลงเป็นตาราง temp โดยใช้ Spark SQL Context จากนั้นดำเนินการคำสั่งเลือกเพื่อดึงแฮชแท็กสิบอันดับแรกพร้อมจำนวนและใส่ลงใน hashtag_counts_df
กรอบข้อมูล
def get_sql_context_instance(spark_context): if ('sqlContextSingletonInstance' not in globals()): globals()['sqlContextSingletonInstance'] = SQLContext(spark_context) return globals()['sqlContextSingletonInstance'] def process_rdd(time, rdd): print('----------- %s -----------' % str(time)) try: # Get spark sql singleton context from the current context sql_context = get_sql_context_instance(rdd.context) # convert the RDD to Row RDD row_rdd = rdd.map(lambda w: Row(hashtag=w[0], hashtag_count=w[1])) # create a DF from the Row RDD hashtags_df = sql_context.createDataFrame(row_rdd) # Register the dataframe as table hashtags_df.registerTempTable('hashtags') # get the top 10 hashtags from the table using SQL and print them hashtag_counts_df = sql_context.sql('select hashtag, hashtag_count from hashtags order by hashtag_count desc limit 10') hashtag_counts_df.show() # call this method to prepare top 10 hashtags DF and send them send_df_to_dashboard(hashtag_counts_df) except: e = sys.exc_info()[0] print('Error: %s' % e)
ขั้นตอนสุดท้ายในแอปพลิเคชัน Spark ของเราคือการส่ง hashtag_counts_df
กรอบข้อมูลไปยังแอปพลิเคชันแดชบอร์ด ดังนั้นเราจะแปลงเฟรมข้อมูลเป็นสองอาร์เรย์หนึ่งสำหรับแฮชแท็กและอีกอันสำหรับการนับ จากนั้นเราจะส่งไปยังแอปพลิเคชันแดชบอร์ดผ่าน REST API
def send_df_to_dashboard(df): # extract the hashtags from dataframe and convert them into array top_tags = [str(t.hashtag) for t in df.select('hashtag').collect()] # extract the counts from dataframe and convert them into array tags_count = [p.hashtag_count for p in df.select('hashtag_count').collect()] # initialize and send the data through REST API url = 'http://localhost:5001/updateData' request_data = {'label': str(top_tags), 'data': str(tags_count)} response = requests.post(url, data=request_data)
สุดท้ายนี่คือตัวอย่างผลลัพธ์ของ Spark Streaming ในขณะที่รันและพิมพ์ hashtag_counts_df
คุณจะสังเกตเห็นว่าเอาต์พุตจะถูกพิมพ์ทุกๆสองวินาทีตามช่วงเวลาของแบตช์
ตอนนี้เราจะสร้างแอปพลิเคชันแดชบอร์ดง่ายๆที่ Spark จะอัปเดตแบบเรียลไทม์ เราจะสร้างโดยใช้ Python, Flask และ Charts.js .
ขั้นแรกมาสร้างโปรเจ็กต์ Python ที่มีโครงสร้างด้านล่างแล้วดาวน์โหลดและเพิ่มไฟล์ Chart.js ไฟล์ลงในไดเร็กทอรีแบบคงที่
จากนั้นใน app.py
ไฟล์เราจะสร้างฟังก์ชันชื่อ update_data
ซึ่ง Spark จะเรียกใช้ผ่าน URL http://localhost:5001/updateData
เพื่ออัปเดตป้ายกำกับส่วนกลางและอาร์เรย์ค่า
นอกจากนี้ฟังก์ชัน refresh_graph_data
ถูกสร้างขึ้นเพื่อเรียกโดยคำขอ AJAX เพื่อส่งคืนเลเบลและอาร์เรย์ค่าที่อัปเดตใหม่เป็น JSON ฟังก์ชั่น get_chart_page
จะแสดง chart.html
หน้าเมื่อเรียก
from flask import Flask,jsonify,request from flask import render_template import ast app = Flask(__name__) labels = [] values = [] @app.route('/') def get_chart_page(): global labels,values labels = [] values = [] return render_template('chart.html', values=values, labels=labels) @app.route('/refreshData') def refresh_graph_data(): global labels, values print('labels now: ' + str(labels)) print('data now: ' + str(values)) return jsonify(sLabel=labels, sData=values) @app.route('/updateData', methods=['POST']) def update_data(): global labels, values if not request.form or 'data' not in request.form: return 'error',400 labels = ast.literal_eval(request.form['label']) values = ast.literal_eval(request.form['data']) print('labels received: ' + str(labels)) print('data received: ' + str(values)) return 'success',201 if __name__ == '__main__': app.run(host='localhost', port=5001)
ตอนนี้เรามาสร้างแผนภูมิง่ายๆใน chart.html
เพื่อแสดงข้อมูลแฮชแท็กและอัปเดตแบบเรียลไทม์ ตามที่กำหนดไว้ด้านล่างเราจำเป็นต้องนำเข้า Chart.js
และ jquery.min.js
ไลบรารี JavaScript
ในแท็กเนื้อหาเราต้องสร้างผ้าใบและให้ ID เพื่ออ้างอิงในขณะที่แสดงแผนภูมิโดยใช้ JavaScript ในขั้นตอนถัดไป
Top Trending Twitter Hashtags Top Trending Twitter Hashtags
ตอนนี้เรามาสร้างแผนภูมิโดยใช้โค้ด JavaScript ด้านล่าง ขั้นแรกเราได้รับองค์ประกอบผืนผ้าใบจากนั้นเราสร้างวัตถุแผนภูมิใหม่และส่งผ่านองค์ประกอบผืนผ้าใบไปที่มันและกำหนดวัตถุข้อมูลดังต่อไปนี้
โปรดทราบว่าป้ายกำกับและข้อมูลของข้อมูลถูกล้อมรอบด้วยเลเบลและตัวแปรค่าที่ส่งกลับขณะแสดงผลเพจเมื่อเรียก get_chart_page
ในฟังก์ชัน app.py
ไฟล์.
ส่วนสุดท้ายที่เหลือคือฟังก์ชันที่กำหนดค่าให้ทำคำขอ Ajax ทุกวินาทีและเรียก URL /refreshData
ซึ่งจะดำเนินการ refresh_graph_data
ใน app.py
และส่งคืนข้อมูลที่อัปเดตใหม่จากนั้นอัปเดตถ่านที่แสดงข้อมูลใหม่
var ctx = document.getElementById('chart'); var myChart = new Chart(ctx, { type: 'horizontalBar', data: { labels: [{% for item in labels %} '{{item}}', {% endfor %}], datasets: [{ label: '# of Mentions', data: [{% for item in values %} {{item}}, {% endfor %}], backgroundColor: [ 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)', 'rgba(255, 159, 64, 0.2)', 'rgba(255, 99, 132, 0.2)', 'rgba(54, 162, 235, 0.2)', 'rgba(255, 206, 86, 0.2)', 'rgba(75, 192, 192, 0.2)', 'rgba(153, 102, 255, 0.2)' ], borderColor: [ 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)', 'rgba(255, 159, 64, 1)', 'rgba(255,99,132,1)', 'rgba(54, 162, 235, 1)', 'rgba(255, 206, 86, 1)', 'rgba(75, 192, 192, 1)', 'rgba(153, 102, 255, 1)' ], borderWidth: 1 }] }, options: { scales: { yAxes: [{ ticks: { beginAtZero:true } }] } } }); var src_Labels = []; var src_Data = []; setInterval(function(){ $.getJSON('/refreshData', { }, function(data) { src_Labels = data.sLabel; src_Data = data.sData; }); myChart.data.labels = src_Labels; myChart.data.datasets[0].data = src_Data; myChart.update(); },1000);
มาเรียกใช้แอปพลิเคชั่นสามตัวตามลำดับด้านล่าง: 1. ไคลเอนต์แอป Twitter 2. แอป Spark 3. Dashboard Web App
จากนั้นคุณสามารถเข้าถึงแดชบอร์ดแบบเรียลไทม์โดยใช้ URL
ตอนนี้คุณสามารถเห็นแผนภูมิของคุณได้รับการอัปเดตดังต่อไปนี้:
เราได้เรียนรู้วิธีการวิเคราะห์ข้อมูลแบบง่าย ๆ เกี่ยวกับข้อมูลแบบเรียลไทม์โดยใช้ Spark Streaming และรวมเข้ากับแดชบอร์ดแบบง่ายโดยใช้บริการเว็บ RESTful จากตัวอย่างนี้เราจะเห็นได้ว่า Spark มีประสิทธิภาพเพียงใดเนื่องจากรวบรวมข้อมูลจำนวนมหาศาลแปลงข้อมูลและดึงข้อมูลเชิงลึกที่มีค่าซึ่งสามารถใช้ในการตัดสินใจได้อย่างง่ายดายในเวลาอันรวดเร็ว มีกรณีการใช้งานที่เป็นประโยชน์มากมายที่สามารถนำไปใช้และรองรับอุตสาหกรรมต่างๆเช่นข่าวสารหรือการตลาด
ตัวอย่างอุตสาหกรรมข่าว
เราสามารถติดตามแฮชแท็กที่ถูกกล่าวถึงบ่อยที่สุดเพื่อทราบว่าหัวข้อใดที่ผู้คนพูดถึงมากที่สุดบนโซเชียลมีเดีย นอกจากนี้เรายังสามารถติดตามแฮชแท็กและทวีตของพวกเขาเพื่อที่จะได้ทราบว่าผู้คนพูดถึงหัวข้อหรือเหตุการณ์ที่เฉพาะเจาะจงในโลกอย่างไร
console.error(`เป็นข้อผิดพลาดที่ทราบว่าสามารถทำลาย npm โปรดอัปเดตเป็นอย่างน้อย ${r
ตัวอย่างการตลาด
เราสามารถรวบรวมสตรีมของทวีตและโดยทำการวิเคราะห์ความรู้สึกจัดหมวดหมู่และกำหนดความสนใจของผู้คนเพื่อกำหนดเป้าหมายด้วยข้อเสนอที่เกี่ยวข้องกับความสนใจของพวกเขา
นอกจากนี้ยังมีกรณีการใช้งานจำนวนมากที่สามารถนำไปใช้โดยเฉพาะสำหรับการวิเคราะห์ข้อมูลขนาดใหญ่และสามารถให้บริการในหลายอุตสาหกรรม สำหรับกรณีการใช้งานทั่วไปของ Apache Spark เพิ่มเติมฉันขอแนะนำให้คุณตรวจสอบหนึ่งใน โพสต์ก่อนหน้านี้ .
ขอแนะนำให้คุณอ่านเพิ่มเติมเกี่ยวกับ Spark Streaming จาก ที่นี่ เพื่อที่จะทราบข้อมูลเพิ่มเติมเกี่ยวกับความสามารถของมันและทำการเปลี่ยนแปลงขั้นสูงกับข้อมูลเพื่อให้ได้ข้อมูลเชิงลึกมากขึ้นในแบบเรียลไทม์โดยใช้
ประมวลผลข้อมูลสตรีมมิ่งและแมชชีนเลิร์นนิงได้อย่างรวดเร็วในระดับที่ใหญ่มาก
สามารถใช้ในการแปลงข้อมูลการวิเคราะห์เชิงคาดการณ์และการตรวจจับการฉ้อโกงบนแพลตฟอร์มข้อมูลขนาดใหญ่
Twitter ช่วยให้คุณได้รับข้อมูลโดยใช้ API วิธีหนึ่งที่ทำให้พร้อมใช้งานคือการสตรีมทวีตแบบเรียลไทม์ตามเกณฑ์การค้นหาที่คุณกำหนด