เว็บแอปพลิเคชันที่ทันสมัยและระบบพื้นฐานนั้นเร็วขึ้นและตอบสนองได้ดีกว่าที่เคยเป็นมา อย่างไรก็ตามยังมีอีกหลายกรณีที่คุณต้องการยกเลิกการทำงานหนักไปยังส่วนอื่น ๆ ของสถาปัตยกรรมระบบทั้งหมดของคุณแทนที่จะจัดการกับเธรดหลักของคุณ การระบุงานดังกล่าวทำได้ง่ายเพียงแค่ตรวจสอบว่างานนั้นอยู่ในประเภทใดประเภทหนึ่งต่อไปนี้:
วิธีแก้ปัญหาที่ตรงไปตรงมาในการรันงานเบื้องหลังคือการเรียกใช้งานภายในเธรดหรือกระบวนการแยกต่างหาก Python เป็นภาษาการเขียนโปรแกรม Turing ระดับสูงที่สมบูรณ์ซึ่งน่าเสียดายที่ไม่มีการใช้งานพร้อมกันในตัวในขนาดที่ตรงกับ Erlang, Go, Java, Scala หรือ Akka สิ่งเหล่านี้ขึ้นอยู่กับกระบวนการลำดับการสื่อสารของ Tony Hoare ( CSP ). ในทางกลับกันเธรด Python ได้รับการประสานงานและกำหนดเวลาโดยล็อคล่ามส่วนกลาง ( GIL ) ซึ่งป้องกันไม่ให้เธรดเนทีฟหลายเธรดรัน Python bytecodes พร้อมกัน การกำจัด GIL เป็นหัวข้อสนทนามากมาย นักพัฒนา Python แต่ไม่ใช่ประเด็นสำคัญของบทความนี้ การเขียนโปรแกรมพร้อมกันใน Python นั้นล้าสมัยแม้ว่าคุณจะอ่านเกี่ยวกับเรื่องนี้ได้ใน การสอน Python Multithreading โดยเพื่อน ApeeScapeer Marcus McCurdy ดังนั้นการออกแบบการสื่อสารระหว่างกระบวนการอย่างสม่ำเสมอจึงเป็นกระบวนการที่เกิดข้อผิดพลาดได้ง่ายและนำไปสู่การเชื่อมต่อรหัสและการบำรุงรักษาระบบที่ไม่ดีซึ่งไม่ต้องพูดถึงว่าจะส่งผลเสียต่อความสามารถในการปรับขนาด นอกจากนี้กระบวนการ Python เป็นกระบวนการปกติภายใต้ระบบปฏิบัติการ (OS) และด้วยไลบรารีมาตรฐาน Python ทั้งหมดจะกลายเป็นเฮฟวี่เวท เมื่อจำนวนกระบวนการในแอปเพิ่มขึ้นการเปลี่ยนจากกระบวนการดังกล่าวไปเป็นกระบวนการอื่นจะกลายเป็นการดำเนินการที่ใช้เวลานาน
เพื่อให้เข้าใจการทำงานพร้อมกันกับ Python ได้ดีขึ้นโปรดดูคำพูดที่น่าทึ่งนี้โดย David Beazley ที่ PyCon’15 .
ทางออกที่ดีกว่ามากคือการให้บริการไฟล์ กระจายคิว หรือกระบวนทัศน์พี่น้องที่รู้จักกันดีเรียกว่า เผยแพร่ - สมัครสมาชิก . ดังที่แสดงในรูปที่ 1 มีแอปพลิเคชั่นสองประเภทซึ่งหนึ่งเรียกว่า สำนักพิมพ์ ส่งข้อความและอื่น ๆ ที่เรียกว่า สมาชิก , รับข้อความ ตัวแทนทั้งสองไม่ได้โต้ตอบกันโดยตรงและไม่ได้รับรู้ถึงกันและกัน ผู้เผยแพร่ส่งข้อความไปยังคิวกลางหรือ นายหน้า และสมาชิกจะได้รับข้อความที่น่าสนใจจากโบรกเกอร์นี้ มีข้อดีสองประการในวิธีนี้:
มีระบบการส่งข้อความจำนวนมากที่สนับสนุนกระบวนทัศน์ดังกล่าวและจัดเตรียม API ที่เรียบร้อยซึ่งขับเคลื่อนด้วยโปรโตคอล TCP หรือ HTTP เช่น JMS, RabbitMQ, Redis Pub / Sub, Apache ActiveMQ เป็นต้น
ผักชีฝรั่ง เป็นหนึ่งในผู้จัดการงานเบื้องหลังที่ได้รับความนิยมมากที่สุดในโลก Python คื่นช่ายเข้ากันได้กับนายหน้าข้อความหลายรายเช่น RabbitMQ หรือ Redis และสามารถทำหน้าที่เป็นทั้งผู้ผลิตและผู้บริโภค
คื่นฉ่ายเป็นคิวงาน / คิวงานแบบอะซิงโครนัสตามการส่งข้อความแบบกระจาย เน้นการทำงานแบบเรียลไทม์ แต่รองรับการตั้งเวลาด้วย หน่วยการดำเนินการที่เรียกว่างานจะถูกดำเนินการพร้อมกันบนเซิร์ฟเวอร์ของผู้ปฏิบัติงานตั้งแต่หนึ่งเซิร์ฟเวอร์ขึ้นไปโดยใช้การประมวลผลหลายขั้นตอน Eventlet , หรือ ระบาย . งานสามารถดำเนินการแบบอะซิงโครนัส (ในพื้นหลัง) หรือพร้อมกัน (รอจนกว่าจะพร้อม) - โครงการขึ้นฉ่าย
ในการเริ่มต้นกับคื่นฉ่ายเพียงทำตามคำแนะนำทีละขั้นตอนที่ เอกสารอย่างเป็นทางการ .
จุดสำคัญของบทความนี้คือเพื่อให้คุณมีความเข้าใจที่ดีว่า Celery สามารถครอบคลุมกรณีการใช้งานใดได้บ้าง ในบทความนี้เราจะไม่เพียง แต่แสดงตัวอย่างที่น่าสนใจเท่านั้น แต่ยังพยายามเรียนรู้วิธีการนำผักชีฝรั่งไปใช้กับงานในโลกแห่งความเป็นจริงเช่นการส่งจดหมายพื้นหลังการสร้างรายงานการบันทึกและการรายงานข้อผิดพลาด ฉันจะแบ่งปันวิธีการทดสอบงานนอกเหนือจากการจำลองและในที่สุดฉันจะให้เทคนิคบางอย่างที่ไม่ได้บันทึกไว้ในเอกสารอย่างเป็นทางการซึ่งใช้เวลาหลายชั่วโมงในการค้นคว้าเพื่อค้นพบตัวเอง
วิธีการเริ่มต้นกองทุนอสังหาริมทรัพย์ไพรเวทอิควิตี้
หากคุณไม่เคยมีประสบการณ์กับผักชีฝรั่งมาก่อนฉันขอแนะนำให้คุณลองทำตามคำแนะนำอย่างเป็นทางการก่อน
หากบทความนี้ดึงดูดคุณและทำให้คุณต้องการเจาะลึกโค้ดทันทีให้ทำตามขั้นตอนนี้ ที่เก็บ GitHub สำหรับรหัสที่ใช้ในบทความนี้ README
ไฟล์จะให้แนวทางที่รวดเร็วและสกปรกสำหรับการเรียกใช้และเล่นกับแอปพลิเคชันตัวอย่าง
สำหรับผู้เริ่มต้นเราจะนำเสนอตัวอย่างที่ใช้งานได้จริงซึ่งจะแสดงให้ผู้อ่านเห็นว่าคื่นฉ่ายที่เรียบง่ายและสง่างามสามารถแก้งานที่ดูเหมือนจะไม่สำคัญได้อย่างไร ตัวอย่างทั้งหมดจะถูกนำเสนอภายในเฟรมเวิร์ก Django; อย่างไรก็ตามส่วนใหญ่สามารถย้ายไปยังเฟรมเวิร์ก Python อื่น ๆ ได้อย่างง่ายดาย (Flask, Pyramid)
เค้าโครงโครงการถูกสร้างขึ้นโดย Cookiecutter Django ; อย่างไรก็ตามฉันเก็บข้อมูลอ้างอิงไว้เพียงเล็กน้อยเท่านั้นซึ่งในความคิดของฉันช่วยอำนวยความสะดวกในการพัฒนาและจัดเตรียมกรณีการใช้งานเหล่านี้ ฉันยังลบโมดูลที่ไม่จำเป็นสำหรับโพสต์นี้และแอปพลิเคชันเพื่อลดเสียงรบกวนและทำให้เข้าใจรหัสได้ง่ายขึ้น
- celery_uncovered/ - celery_uncovered/__init__.py - celery_uncovered/{toyex,tricks,advex} - celery_uncovered/celery.py - config/settings/{base,local,test}.py - config/urls.py - manage.py
celery_uncovered/{toyex,tricks,advex}
มีแอพพลิเคชั่นต่างๆที่เราจะกล่าวถึงในโพสต์นี้ แต่ละแอปพลิเคชันประกอบด้วยชุดตัวอย่างที่จัดเรียงตามระดับความเข้าใจขึ้นฉ่ายที่ต้องการcelery_uncovered/celery.py
กำหนดตัวอย่างผักชีฝรั่ง ไฟล์: celery_uncovered/celery.py
:
from __future__ import absolute_import import os from celery import Celery, signals # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local') app = Celery('celery_uncovered') # Using a string here means the worker will not have to # pickle the object when using Windows. app.config_from_object('django.conf:settings', namespace='CELERY') app.autodiscover_tasks()
จากนั้นเราต้องมั่นใจว่าขึ้นฉ่ายจะเริ่มต้นพร้อมกับ Django ด้วยเหตุนี้เราจึงนำเข้าแอปใน celery_uncovered/__init__.py
ไฟล์: celery_uncovered/__init__.py
:
from __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app. from .celery import app as celery_app # noqa __all__ = ['celery_app'] __version__ = '0.0.1' __version_info__ = tuple([int(num) if num.isdigit() else num for num in __version__.replace('-', '.', 1).split('.')])
config/settings
เป็นแหล่งที่มาของการกำหนดค่าแอพและคื่นฉ่ายของเรา ขึ้นอยู่กับสภาพแวดล้อมการดำเนินการ Django จะเปิดการตั้งค่าที่เกี่ยวข้อง: local.py
เพื่อการพัฒนาหรือ test.py
สำหรับการทดสอบ คุณสามารถกำหนดสภาพแวดล้อมของคุณเองได้หากต้องการโดยสร้างโมดูล python ใหม่ (เช่น prod.py
) การกำหนดค่าคื่นช่ายขึ้นต้นด้วย CELERY_
สำหรับโพสต์นี้ฉันกำหนดค่า RabbitMQ เป็นโบรกเกอร์และ SQLite เป็นผลลัพธ์ bac-end
ไฟล์: config/local.py
:
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='amqp://guest: [email protected] :5672//') CELERY_RESULT_BACKEND = 'django-db+sqlite:///results.sqlite'
กรณีแรกที่เราจะกล่าวถึงคือการสร้างรายงานและการส่งออก ในตัวอย่างนี้คุณจะได้เรียนรู้วิธีกำหนดงานที่สร้างรายงาน CSV และกำหนดเวลางานเป็นระยะ ๆ ด้วย คื่นฉ่าย .
ใช้คำอธิบายกรณี: ดึงที่เก็บที่ร้อนแรงที่สุดห้าร้อยแห่งจาก GitHub ต่อช่วงเวลาที่เลือก (วันสัปดาห์เดือน) จัดกลุ่มตามหัวข้อและส่งออกผลลัพธ์ไปยังไฟล์ CSV
หากเราให้บริการ HTTP ที่จะเรียกใช้คุณลักษณะนี้โดยคลิกปุ่มที่มีข้อความ 'สร้างรายงาน' แอปพลิเคชันจะหยุดทำงานและรอให้งานเสร็จสมบูรณ์ก่อนที่จะส่งการตอบกลับ HTTP กลับ นี้ไม่ดี. เราต้องการให้เว็บแอปพลิเคชันของเราทำงานได้อย่างรวดเร็วและเราไม่ต้องการให้ผู้ใช้ของเรารอในขณะที่ส่วนหลังของเราคำนวณผลลัพธ์ แทนที่จะรอให้ผลลัพธ์ออกมาเราอยากจัดคิวงานให้กับกระบวนการของผู้ปฏิบัติงานผ่านคิวที่ลงทะเบียนไว้ใน Celery และตอบกลับด้วย task_id
ไปยังส่วนหน้า จากนั้นส่วนหน้าจะใช้ task_id
เพื่อค้นหาผลลัพธ์ของงานในรูปแบบอะซิงโครนัส (เช่น AJAX) และจะแจ้งให้ผู้ใช้ทราบถึงความคืบหน้าของงาน ในที่สุดเมื่อกระบวนการเสร็จสิ้นผลลัพธ์สามารถใช้เป็นไฟล์เพื่อดาวน์โหลดผ่าน HTTP
ก่อนอื่นให้เราแยกย่อยกระบวนการออกเป็นหน่วยที่เล็กที่สุดเท่าที่จะเป็นไปได้และสร้างไปป์ไลน์:
การเรียกที่เก็บเป็นคำขอ HTTP โดยใช้ไฟล์ GitHub Search API GET /search/repositories
. อย่างไรก็ตามมีข้อ จำกัด ของบริการ GitHub API ที่ควรจัดการ: API ส่งคืนที่เก็บได้สูงสุด 100 ที่เก็บต่อคำขอแทนที่จะเป็น 500 เราสามารถส่งคำขอได้ทีละห้าคำขอ แต่เราไม่ต้องการให้ผู้ใช้รอ สำหรับแต่ละคำขอห้ารายการเนื่องจากคำขอ HTTP เป็นการดำเนินการที่ผูกกับ I / O แต่เราสามารถดำเนินการตามคำขอ HTTP ห้ารายการพร้อมกันด้วยพารามิเตอร์ของเพจที่เหมาะสม ดังนั้นหน้าจะอยู่ในช่วง [1..5] มากำหนดภารกิจที่เรียกว่า fetch_hot_repos/3 -> list
ใน toyex/tasks.py
โมดูล:
ไฟล์: celery_uncovered/toyex/local.py
@shared_task def fetch_hot_repos(since, per_page, page): payload = { 'sort': 'stars', 'order': 'desc', 'q': 'created:>={date}'.format(date=since), 'per_page': per_page, 'page': page, 'access_token': settings.GITHUB_OAUTH} headers = {'Accept': 'application/vnd.github.v3+json'} connect_timeout, read_timeout = 5.0, 30.0 r = requests.get( 'https://api.github.com/search/repositories', params=payload, headers=headers, timeout=(connect_timeout, read_timeout)) items = r.json()[u'items'] return items
ดังนั้น fetch_hot_repos
สร้างคำขอไปยัง GitHub API และตอบกลับผู้ใช้ด้วยรายการที่เก็บ ได้รับพารามิเตอร์สามตัวที่จะกำหนดเพย์โหลดคำขอของเรา:
since
- กรองที่เก็บในวันที่สร้างper_page
- จำนวนผลลัพธ์ที่จะส่งคืนต่อคำขอ (จำกัด ที่ 100)page
—- หมายเลขหน้าที่ขอ (ในช่วง [1..5]) บันทึก: ในการใช้ GitHub Search API คุณจะต้องมี OAuth Token เพื่อผ่านการตรวจสอบการพิสูจน์ตัวตน ในกรณีของเราบันทึกไว้ในการตั้งค่าภายใต้ GITHUB_OAUTH
ต่อไปเราต้องกำหนดภารกิจหลักที่จะรับผิดชอบในการรวบรวมผลลัพธ์และส่งออกเป็นไฟล์ CSV: produce_hot_repo_report_task/2->filepath:
ไฟล์: celery_uncovered/toyex/local.py
@shared_task def produce_hot_repo_report(period, ref_date=None): # 1. parse date ref_date_str = strf_date(period, ref_date=ref_date) # 2. fetch and join fetch_jobs = group([ fetch_hot_repos.s(ref_date_str, 100, 1), fetch_hot_repos.s(ref_date_str, 100, 2), fetch_hot_repos.s(ref_date_str, 100, 3), fetch_hot_repos.s(ref_date_str, 100, 4), fetch_hot_repos.s(ref_date_str, 100, 5) ]) # 3. group by language and # 4. create csv return chord(fetch_jobs)(build_report_task.s(ref_date_str)).get() @shared_task def build_report_task(results, ref_date): all_repos = [] for repos in results: all_repos += [Repository(repo) for repo in repos] # 3. group by language grouped_repos = {} for repo in all_repos: if repo.language in grouped_repos: grouped_repos[repo.language].append(repo.name) else: grouped_repos[repo.language] = [repo.name] # 4. create csv lines = [] for lang in sorted(grouped_repos.keys()): lines.append([lang] + grouped_repos[lang]) filename = '{media}/github-hot-repos-{date}.csv'.format(media=settings.MEDIA_ROOT, date=ref_date) return make_csv(filename, lines)
งานนี้ใช้ celery.canvas.group
เพื่อเรียกใช้งานพร้อมกันห้าสายของ fetch_hot_repos/3
. ผลลัพธ์เหล่านั้นถูกรอและลดลงเป็นรายการของอ็อบเจ็กต์ที่เก็บ จากนั้นชุดผลลัพธ์ของเราจะถูกจัดกลุ่มตามหัวข้อและสุดท้ายส่งออกเป็นไฟล์ CSV ที่สร้างขึ้นภายใต้ MEDIA_ROOT/
ไดเรกทอรี
ข้อใดไม่ถูกต้องเกี่ยวกับการพัฒนาเว็บแอปพลิเคชัน
ในการจัดกำหนดการงานเป็นระยะคุณอาจต้องการเพิ่มรายการในรายการกำหนดการในไฟล์กำหนดค่า:
ไฟล์: config/local.py
from celery.schedules import crontab CELERY_BEAT_SCHEDULE = { 'produce-csv-reports': { 'task': 'celery_uncovered.toyex.tasks.produce_hot_repo_report_task', 'schedule': crontab(minute=0, hour=0) # midnight, 'args': ('today',) }, }
ในการเริ่มต้นและทดสอบวิธีการทำงานก่อนอื่นเราต้องเริ่มกระบวนการขึ้นฉ่าย:
$ celery -A celery_uncovered worker -l info
ต่อไปเราต้องสร้าง celery_uncovered/media/
ไดเรกทอรี จากนั้นคุณจะสามารถทดสอบการทำงานของมันได้ผ่านทาง Shell หรือ Celerybeat:
เปลือก :
from datetime import date from celery_uncovered.toyex.tasks import produce_hot_repo_report_task produce_hot_repo_report_task.delay('today').get(timeout=5)
ขึ้นฉ่าย :
# Start celerybeat with the following command $ celery -A celery_uncovered beat -l info
คุณสามารถรับชมผลลัพธ์ได้ที่ MEDIA_ROOT/
ไดเรกทอรี
หนึ่งในกรณีการใช้งาน Celery ที่พบบ่อยที่สุดคือการส่งการแจ้งเตือนทางอีเมล การแจ้งเตือนทางอีเมลเป็นการดำเนินการผูก I / O แบบออฟไลน์ที่ใช้ประโยชน์จากเซิร์ฟเวอร์ SMTP ภายในหรือ SES ของบุคคลที่สาม มีกรณีการใช้งานมากมายที่เกี่ยวข้องกับการส่งอีเมลและส่วนใหญ่ผู้ใช้ไม่จำเป็นต้องรอจนกว่ากระบวนการนี้จะเสร็จสิ้นก่อนที่จะได้รับการตอบกลับ HTTP นั่นเป็นเหตุผลว่าทำไมจึงควรดำเนินงานดังกล่าวในเบื้องหลังและตอบสนองต่อผู้ใช้ทันที
ใช้คำอธิบายกรณี: รายงานข้อผิดพลาด 50X ไปยังอีเมลของผู้ดูแลระบบผ่าน Celery
Python และ Django มีพื้นฐานที่จำเป็นในการดำเนินการ การบันทึกระบบ . ฉันจะไม่ลงรายละเอียดว่าการบันทึกของ Python ทำงานอย่างไร อย่างไรก็ตามหากคุณไม่เคยลองใช้มาก่อนหรือต้องการการทบทวนโปรดอ่านเอกสารประกอบในตัว การบันทึก โมดูล. คุณต้องการสิ่งนี้อย่างแน่นอนในสภาพแวดล้อมการผลิตของคุณ Django มีตัวจัดการคนตัดไม้พิเศษที่เรียกว่า AdminEmailHandler ผู้ดูแลระบบอีเมลสำหรับแต่ละข้อความบันทึกที่ได้รับ
แนวคิดหลักคือการขยาย send_mail
วิธีการของ AdminEmailHandler
ชั้นเรียนในลักษณะที่สามารถส่งจดหมายผ่านคื่นฉ่าย ซึ่งสามารถทำได้ดังภาพประกอบด้านล่าง:
ขั้นแรกเราต้องตั้งค่างานที่เรียกว่า report_error_task
ที่เรียก mail_admins
พร้อมหัวเรื่องและข้อความที่ระบุ:
ไฟล์: celery_uncovered/toyex/tasks.py
@shared_task def report_error_task(subject, message, *args, **kwargs): mail_admins(subject, message, *args, **kwargs)
ต่อไปเราจะขยาย AdminEmailHandler เพื่อที่ภายในจะเรียกเฉพาะงาน Celery ที่กำหนดไว้:
ไฟล์: celery_uncovered/toyex/admin_email.py
from django.utils.log import AdminEmailHandler from celery_uncovered.handlers.tasks import report_error_task class CeleryHandler(AdminEmailHandler): def send_mail(self, subject, message, *args, **kwargs): report_error_task.delay(subject, message, *args, **kwargs)
สุดท้ายเราต้องตั้งค่าการบันทึก การกำหนดค่าการเข้าสู่ระบบ Django ค่อนข้างตรงไปตรงมา สิ่งที่คุณต้องการคือการลบล้าง LOGGING
เพื่อให้เอ็นจินการบันทึกเริ่มต้นโดยใช้ตัวจัดการที่กำหนดใหม่:
ไฟล์ config/settings/local.py
LOGGING = { 'version': 1, 'disable_existing_loggers': False, ..., 'handlers': { ... 'mail_admins': { 'level': 'ERROR', 'filters': ['require_debug_true'], 'class': 'celery_uncovered.toyex.log_handlers.admin_email.CeleryHandler' } }, 'loggers': { 'django': { 'handlers': ['console', 'mail_admins'], 'level': 'INFO', }, ... } }
สังเกตว่าฉันตั้งใจตั้งค่าตัวกรองตัวจัดการ require_debug_true
เพื่อทดสอบการทำงานนี้ในขณะที่แอปพลิเคชันกำลังทำงานในโหมดดีบัก
เพื่อทดสอบฉันได้เตรียมมุมมอง Django ที่ทำหน้าที่ 'หารด้วยศูนย์' ที่ localhost:8000/report-error
คุณต้องเริ่มคอนเทนเนอร์ MailHog Docker เพื่อทดสอบว่ามีการส่งอีเมลจริงหรือไม่
$ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog $ CELERY_TASKSK_ALWAYS_EAGER=False python manage.py runserver $ # with your browser navigate to [http://localhost:8000](http://localhost:8000) $ # now check your outgoing emails by vising web UI [http://localhost:8025](http://localhost:8025)
ในฐานะเครื่องมือทดสอบเมลฉันตั้งค่า MailHog และกำหนดค่าการส่งเมล Django เพื่อใช้สำหรับการส่ง SMTP มีหลายวิธีในการ ปรับใช้และเรียกใช้ MailHog ฉันตัดสินใจเลือกใช้ Docker container คุณสามารถค้นหารายละเอียดในไฟล์ README ที่เกี่ยวข้อง:
ไฟล์: docker/mailhog/README.md
$ docker build . -f docker/mailhog/Dockerfile -t mailhog/mailhog:latest $ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog $ # navigate with your browser to localhost:8025
ในการกำหนดค่าแอปพลิเคชันของคุณให้ใช้ MailHog คุณต้องเพิ่มบรรทัดต่อไปนี้ในการกำหนดค่าของคุณ:
ไฟล์: config/settings/local.py
EMAIL_BACKEND = env('DJANGO_EMAIL_BACKEND', default='django.core.mail.backends.smtp.EmailBackend') EMAIL_PORT = 1025 EMAIL_HOST = env('EMAIL_HOST', default='mailhog')
งานขึ้นฉ่ายสามารถสร้างขึ้นจากฟังก์ชันที่เรียกได้ ตามค่าเริ่มต้นงานที่ผู้ใช้กำหนดเองจะถูกแทรกด้วย celery.app.task.Task
เป็นคลาสผู้ปกครอง (นามธรรม) คลาสนี้ประกอบด้วยฟังก์ชันการทำงานของการรันงานแบบอะซิงโครนัส (ส่งผ่านเครือข่ายไปยังผู้ปฏิบัติงานคื่นฉ่าย) หรือแบบซิงโครนัส (เพื่อวัตถุประสงค์ในการทดสอบ) การสร้างลายเซ็นและยูทิลิตี้อื่น ๆ อีกมากมาย ในตัวอย่างถัดไปเราจะพยายามขยาย Celery.app.task.Task
จากนั้นใช้เป็นคลาสพื้นฐานเพื่อเพิ่มพฤติกรรมที่เป็นประโยชน์บางอย่างให้กับงานของเรา
ในโครงการหนึ่งของฉันฉันกำลังพัฒนาแอปที่ให้ผู้ใช้ปลายทางมีเครื่องมือที่คล้าย Extract, Transform, Load (ETL) ซึ่งสามารถนำเข้าและกรองข้อมูลตามลำดับชั้นจำนวนมากได้ ส่วนหลังแบ่งออกเป็นสองโมดูล:
คื่นช่ายถูกนำไปใช้กับ Celerybeat หนึ่งอินสแตนซ์และคนงานมากกว่า 40 คน มีงานที่แตกต่างกันมากกว่ายี่สิบงานซึ่งประกอบด้วยกิจกรรมการวางท่อและการประสานงาน แต่ละงานดังกล่าวอาจล้มเหลวในบางจุด ความล้มเหลวทั้งหมดนี้ถูกทิ้งลงในบันทึกระบบของพนักงานแต่ละคน ในบางจุดเริ่มไม่สะดวกในการแก้จุดบกพร่องและรักษาชั้นผักชีฝรั่ง ในที่สุดเราตัดสินใจแยกบันทึกงานเป็นไฟล์เฉพาะงาน
ใช้คำอธิบายกรณี: ขยาย Celery เพื่อให้แต่ละงานบันทึกเอาต์พุตมาตรฐานและข้อผิดพลาดไปยังไฟล์
คื่นฉ่ายให้แอปพลิเคชัน Python พร้อมการควบคุมสิ่งที่ทำภายในได้ดีเยี่ยม มันมาพร้อมกับกรอบสัญญาณที่คุ้นเคย แอปพลิเคชั่นที่ใช้ Celery สามารถสมัครสมาชิกได้บางส่วนเพื่อเพิ่มพฤติกรรมของการกระทำบางอย่าง เราจะใช้ประโยชน์จากสัญญาณระดับงานเพื่อให้สามารถติดตามวงจรชีวิตของงานแต่ละงานได้อย่างละเอียด คื่นฉ่ายมักจะมาพร้อมกับส่วนหลังของการตัดไม้และเราจะใช้ประโยชน์จากมันในขณะที่มีเพียงไม่กี่แห่งเท่านั้นที่จะบรรลุเป้าหมายของเรา
คื่นช่ายรองรับการบันทึกต่องานแล้ว ในการบันทึกลงไฟล์จำเป็นต้องจัดส่งเอาต์พุตบันทึกไปยังตำแหน่งที่เหมาะสม ในกรณีของเราตำแหน่งที่เหมาะสมของงานคือไฟล์ที่ตรงกับชื่อของงาน ในอินสแตนซ์ Celery เราจะลบล้างการกำหนดค่าการบันทึกในตัวด้วยตัวจัดการการบันทึกที่สรุปแบบไดนามิก เป็นไปได้ที่จะสมัครรับ celeryd_after_setup
สัญญาณจากนั้นกำหนดค่าการบันทึกระบบที่นั่น:
ไฟล์: celery_uncovered/toyex/celery_conf.py
@signals.celeryd_after_setup.connect def configure_task_logging(instance=None, **kwargs): tasks = instance.app.tasks.keys() LOGS_DIR = settings.ROOT_DIR.path('logs') if not os.path.exists(str(LOGS_DIR)): os.makedirs(str(LOGS_DIR)) print 'dir created' default_handler = { 'level': 'DEBUG', 'filters': None, 'class': 'logging.FileHandler', 'filename': '' } default_logger = { 'handlers': [], 'level': 'DEBUG', 'propogate': True } LOG_CONFIG = { 'version': 1, # 'incremental': True, 'disable_existing_loggers': False, 'handlers': {}, 'loggers': {} } for task in tasks: task = str(task) if not task.startswith('celery_uncovered.'): continue task_handler = copy_dict(default_handler) task_handler['filename'] = str(LOGS_DIR.path(task + '.log')) task_logger = copy_dict(default_logger) task_logger['handlers'] = [task] LOG_CONFIG['handlers'][task] = task_handler LOG_CONFIG['loggers'][task] = task_logger logging.config.dictConfig(LOG_CONFIG)
โปรดสังเกตว่าสำหรับแต่ละงานที่ลงทะเบียนในแอพ Celery เรากำลังสร้างคนตัดไม้ที่สอดคล้องกันด้วยตัวจัดการ ตัวจัดการแต่ละตัวเป็นประเภท logging.FileHandler
ดังนั้นแต่ละอินสแตนซ์ดังกล่าวจึงได้รับชื่อไฟล์เป็นอินพุต สิ่งที่คุณต้องมีเพื่อให้ทำงานได้คือนำเข้าโมดูลนี้ไปที่ celery_uncovered/celery.py
ในตอนท้ายของไฟล์:
cfo ที่รับผิดชอบคืออะไร?
import celery_uncovered.tricks.celery_conf
สามารถรับผู้บันทึกงานเฉพาะได้โดยโทรไปที่ get_task_logger(task_name)
เพื่อที่จะสรุปพฤติกรรมดังกล่าวสำหรับแต่ละงานจำเป็นต้องขยาย celery.current_app.Task
เล็กน้อย ด้วยยูทิลิตี้บางวิธี:
ไฟล์: celery_uncovered/tricks/celery_ext.py
class LoggingTask(current_app.Task): abstract = True ignore_result = False @property def logger(self): logger = get_task_logger(self.name) return logger def log_msg(self, msg, *msg_args): self.logger.debug(msg, *msg_args)
ตอนนี้ในกรณีของการเรียกไปที่ task.log_msg('Hello, my name is: %s', task.request.id)
เอาต์พุตบันทึกจะถูกกำหนดเส้นทางไปยังไฟล์ที่เกี่ยวข้องภายใต้ชื่องาน
ในการเริ่มต้นและทดสอบว่างานนี้ทำงานอย่างไรให้เริ่มขั้นตอนขึ้นฉ่าย:
$ celery -A celery_uncovered worker -l info
จากนั้นคุณจะสามารถทดสอบการทำงานผ่าน Shell:
from datetime import date from celery_uncovered.tricks.tasks import add add.delay(1, 3)
สุดท้ายเพื่อดูผลลัพธ์ให้ไปที่ celery_uncovered/logs
ไดเร็กทอรีและเปิดล็อกไฟล์ที่เรียกว่า celery_uncovered.tricks.tasks.add.log
คุณอาจเห็นสิ่งที่คล้ายกันดังต่อไปนี้หลังจากเรียกใช้งานนี้หลายครั้ง:
วิธีตั้งโปรแกรมบอท
Result of 1 + 2 = 3 Result of 1 + 2 = 3 ...
ลองจินตนาการถึงแอปพลิเคชั่น Python สำหรับผู้ใช้ต่างประเทศที่สร้างขึ้นบน Celery และ Django ผู้ใช้สามารถกำหนดภาษา (โลแคล) ที่ใช้แอปพลิเคชันของคุณได้
คุณต้องออกแบบระบบการแจ้งเตือนทางอีเมลหลายภาษา ในการส่งการแจ้งเตือนทางอีเมลคุณได้ลงทะเบียนงานคื่นฉ่ายพิเศษที่จัดการตามคิวเฉพาะ งานนี้ได้รับอาร์กิวเมนต์ที่สำคัญบางส่วนเป็นอินพุตและตำแหน่งของผู้ใช้ปัจจุบันเพื่อให้อีเมลถูกส่งในภาษาที่ผู้ใช้เลือก
ตอนนี้ลองนึกดูว่าเรามีงานหลายอย่าง แต่แต่ละงานเหล่านั้นยอมรับข้อโต้แย้งของโลแคล ในกรณีนี้จะไม่เป็นการดีกว่าที่จะแก้ปัญหาในระดับที่สูงขึ้นของนามธรรม ที่นี่เราจะเห็นวิธีการทำ
ใช้คำอธิบายกรณี: สืบทอดขอบเขตจากบริบทการดำเนินการหนึ่งโดยอัตโนมัติและแทรกลงในบริบทการดำเนินการปัจจุบันเป็นพารามิเตอร์
เช่นเดียวกับที่เราทำกับการบันทึกงานเราต้องการขยายคลาสงานพื้นฐาน celery.current_app.Task
และแทนที่วิธีการบางอย่างที่รับผิดชอบในการเรียกงาน สำหรับจุดประสงค์ของการสาธิตนี้ฉันกำลังลบล้าง celery.current_app.Task::apply_async
วิธี. มีงานพิเศษสำหรับโมดูลนี้ที่จะช่วยให้คุณผลิตชิ้นส่วนทดแทนที่ทำงานได้อย่างสมบูรณ์
ไฟล์: celery_uncovered/tricks/celery_ext.py
class ScopeBasedTask(current_app.Task): abstract = True ignore_result = False default_locale_id = DEFAULT_LOCALE_ID scope_args = ('locale_id',) def __init__(self, *args, **kwargs): super(ScopeBasedTask, self).__init__(*args, **kwargs) self.set_locale(locale=kwargs.get('locale_id', None)) def set_locale(self, scenario_id=None): self.locale_id = self.default_locale_id if locale_id: self.locale_id = locale_id else: self.locale_id = get_current_locale().id def apply_async(self, args=None, kwargs=None, **other_kwargs): self.inject_scope_args(kwargs) return super(ScopeBasedTask, self).apply_async(args=args, kwargs=kwargs, **other_kwargs) def __call__(self, *args, **kwargs): task_rv = super(ScopeBasedTask, self).__call__(*args, **kwargs) return task_rv def inject_scope_args(self, kwargs): for arg in self.scope_args: if arg not in kwargs: kwargs[arg] = getattr(self, arg)
เบาะแสสำคัญคือการส่งผ่านโลแคลปัจจุบันเป็นอาร์กิวเมนต์คีย์ - ค่าไปยังงานการเรียกตามค่าเริ่มต้น หากงานถูกเรียกด้วยโลแคลที่แน่นอนเป็นอาร์กิวเมนต์งานนั้นจะไม่เปลี่ยนแปลง
ในการทดสอบฟังก์ชันนี้ให้เรากำหนดงานจำลองประเภท ScopeBasedTask
ค้นหาไฟล์ตามรหัสโลแคลและอ่านเนื้อหาเป็น JSON:
ไฟล์: celery_uncovered/tricks/tasks.py
@shared_task(bind=True, base=ScopeBasedTask) def read_scenario_file_task(self, **kwargs): fixture_parts = ['locales', 'sc_%i.json' % kwargs['scenario_id']] return read_fixture(*fixture_parts)
ตอนนี้สิ่งที่คุณต้องทำคือทำซ้ำขั้นตอนของการเปิด Celery เริ่มต้นเชลล์และทดสอบการทำงานของงานนี้ในสถานการณ์ต่างๆ การแข่งขันจะอยู่ใต้ celery_uncovered/tricks/fixtures/locales/
ไดเรกทอรี
โพสต์นี้มีวัตถุประสงค์เพื่อสำรวจคื่นฉ่ายจากมุมมองที่แตกต่างกัน ฉันสาธิตคื่นฉ่ายในตัวอย่างทั่วไปเช่นการส่งจดหมายและการสร้างรายงานตลอดจนเทคนิคที่ใช้ร่วมกันสำหรับกรณีการใช้งานทางธุรกิจเฉพาะที่น่าสนใจ คื่นฉ่ายสร้างขึ้นจากปรัชญาที่ขับเคลื่อนด้วยข้อมูลและทีมของคุณอาจทำให้ชีวิตของพวกเขาง่ายขึ้นมากโดยการแนะนำให้เป็นส่วนหนึ่งของระบบของพวกเขา การพัฒนาบริการที่ใช้คื่นฉ่ายนั้นไม่ซับซ้อนมากนักหากคุณมีประสบการณ์ Python ขั้นพื้นฐานและคุณควรจะรับบริการได้เร็วพอสมควร การกำหนดค่าเริ่มต้นนั้นดีเพียงพอสำหรับการใช้งานส่วนใหญ่ แต่หากจำเป็นก็สามารถยืดหยุ่นได้มาก
ทีมงานของเราได้เลือกใช้ Celery เป็นแบ็คเอนด์ของการประสานงานสำหรับงานเบื้องหลังและงานที่ดำเนินมายาวนาน เราใช้มันอย่างกว้างขวางสำหรับกรณีการใช้งานที่หลากหลายซึ่งมีเพียงไม่กี่คนเท่านั้นที่กล่าวถึงในโพสต์นี้ เรานำเข้าและวิเคราะห์ข้อมูลกิกะไบต์ทุกวัน แต่นี่เป็นเพียงจุดเริ่มต้นของเทคนิคการปรับขนาดตามแนวนอนเท่านั้น
คื่นฉ่ายเป็นหนึ่งในผู้จัดการงานเบื้องหลังที่ได้รับความนิยมมากที่สุดในโลก Python คื่นช่ายเข้ากันได้กับนายหน้าข้อความหลายรายเช่น RabbitMQ หรือ Redis และสามารถทำหน้าที่เป็นทั้งผู้ผลิตและผู้บริโภค
รูปแบบการสมัครสมาชิก (หรือผู้ผลิต - ผู้บริโภค) เป็นรูปแบบการส่งข้อความแบบกระจายในระบบคอมพิวเตอร์ที่ผู้เผยแพร่เผยแพร่ข้อความผ่านนายหน้าข้อความและผู้ติดตามรับฟังข้อความ ทั้งสองสามารถแยกส่วนประกอบของระบบโดยไม่ทราบหรือสื่อสารโดยตรงกับอีกเครื่องหนึ่ง