DJANGO-EB-SQS: DjangoアプリケーションがAmazon SQS(Simple Queue Service)と通信するための簡単な方法
トピック: バラクーダのエンジニアリング
2020年9月4日、Rohan Patil
Amazon ECS(Elastic Container Service)、Amazon S3(Simple Storage Service)、Amazon Kinesis、Amazon SQS、Amazon RDS(Relational Database Service)などのAWS(Amazon Web Services)サービスは世界中で広く使用されています。バラクーダは、Djangoフレームワーク上で開発したマイクロサービス内とマイクロサービス間のメッセージングを管理するために、Amazon SQSを使用しています。
Amazon SQSは、メッセージキューサービスであり、メッセージを損失することも、他のサービスを必要とすることもなく、ソフトウェアコンポーネント間でメッセージを任意の量で送信、保存、および受信できるサービスです。また、企業が、アプリケーションをデカップリングし、サービスをスケーリングできるように設計されており、マイクロサービスへのバラクーダの取り組みに最適なツールです。しかし、新規のDjangoベースのマイクロサービス、またはAmazon SQSによる既存のサービスのデカップリングには、Amazon SQSと通信するコードとロジックを複製する必要がありました。この結果、多くの反復コードが生じたため、バラクーダはGitHubライブラリDJANGO-EB-SQSを構築しました。
DJANGO-EB-SQSは開発者がAmazon SQSを既存または新規のDjangoベースのアプリケーションと迅速に統合できるように設計されたPythonライブラリです。このライブラリは下記のタスクを実行します。
- データをシリアライズする。
- 遅延ロジックを追加する。
- キューから連続ポーリングを実行する。
- Amazon SQSの標準に従ってデータをデシリアライズし、Amazon SQSとの通信にサードパーティのライブラリを使用する。
つまり、このライブラリはAmazon SQSとの通信に関わるすべての複雑性を抽象化するため、開発者はコアビジネスロジックのみに注力できます。
このライブラリはDjango ORMフレームワークとboto3ライブラリに基づいています。
使用方法
バラクーダはAI(人工知能)を使用してスピアフィッシングなどのソーシャルエンジニアリング攻撃を検出するメール保護ソリューションに取り組んでいます。バラクーダのソリューションは、お客様のOffice 365アカウントと統合されており、新しいメールを受信するたびに、通知を受信します。タスクの一つは新しいメールが不正ではないかどうかを判断することです。バラクーダのサービスの一つ(図1: サービス1)は、通知を受信すると、Graph APIによってOffice 365と通信し、メールを取得します。また、メールをさらに処理し、他のサービスに使用できるように、Amazon SQSキュー(図1: queue_1)にプッシュします。
図1
バラクーダのソリューションでこのライブラリがどのように使用されているかに関するわかりやすいユースケースは上記のとおりです。バラクーダのサービスの一つ(図1: サービス2)は、他のサービスに使用できるように、個別のメールからヘッダと機能セットを抽出します。
サービス2は、メールの本文を取得するライブラリの設定によって、queue_1をリッスンするように設定されています。
たとえば、サービス2は下記の処理を実行します。
(原文ページもご覧ください)
# queue_1からメールを使用
…
# メールからヘッダと機能セットを抽出
…
# タスクを送信
process_message.delay(tenant_id=, email_id=, headers=, tenant_id=, feature_set=, ….)
このprocess_messageメソッドは、同期的に呼び出されるのではなく、タスクとしてキューに追加され、ワーカーの一つによってピックアップされると、実行されます。このワーカーは、同じサービスのものであっても、別のサービスのものであってもかまいません。メソッドの呼び出し元は、基本的な動作、およびタスクの実行方法について心配する必要はありません。
process_messageメソッドをタスクとして定義する方法は下記のとおりです。
from eb_sqs.decorators import task
@task(queue_name=’queue_2′, max_retries=3)
def process_message(tenant_id: int, email_id: str, headers: List[dict], feature_set: List[dict], …) :
try:
# ヘッダと機能セットによって処理を実行
# 必要に応じて、タスクをキューに追加することもできる
except(OperationalError, InterfaceError) as exc:
try:
process_message.retry()
except MaxRetriesReachedException:
logger.error(‘MaxRetries reached for Service2:process_message ex: {exc}’)
タスクデコレータでメソッドを装飾すると、メッセージが、シリアライズされ、Amazon SQSキューにプッシュされる前に、呼び出し元のメソッド、ターゲットメソッド、引数、メタデータなどのデータが追加されます。このため、ワーカーの一つがキューから使用したメッセージは、どのメソッドを呼び出すか、どのパラメータを渡すかなど、タスクの実行に必要なすべての情報を含んでいます。
例外が発生した場合は、タスクを再試行することもできます。しかし、無限ループに陥らないように、オプションのパラメータmax_retriesを設定すると、最大再試行回数に達した後に、処理を停止できます。また、エラーをログに記録するか、タスクを配信不能キューに送信して、さらに分析できます。
Amazon SQSでは、メッセージの処理を最大15分遅延できます。delayパラメータを渡すと、同様の機能をタスクに追加できます。
process_message.delay(email_id=, headers=, …., delay=300) # 5分遅延
Djangoコマンドprocess_queueを実行すると、タスクを実行できます。このコマンドは、1つ以上のキューのリッスンをサポートしており、キューを無限に読み込み、タスクを実行します。
python manage.py process_queue –queues
このライブラリによってAmazon SQSキューによるサービス内またはサービス間の通信を簡単にする方法は上記のとおりです。
Djangoでこのライブラリを設定する方法、複数のキューをリッスンする機能、開発のセットアップ、および多くの機能の詳細については、こちらをご参照ください。
貢献
プロジェクトへの貢献については、DJANGO-EB-SQSをご参照ください。
原文はこちら:
DJANGO-EB-SQS: An easier way for Django applications to communicate with AWS SQS
September 4, 2020 Rohan Patil