From bc3f9f8dcfefde56b7fa5f5a622d463956deca13 Mon Sep 17 00:00:00 2001 From: Gaz Date: Mon, 10 Apr 2023 20:47:18 +0100 Subject: [PATCH] Refactored researcher.py. now recieves response --- funding_agency.py | 4 ++-- researcher.py | 54 +++++++++++++++++++++++++++++------------------ 2 files changed, 35 insertions(+), 23 deletions(-) diff --git a/funding_agency.py b/funding_agency.py index 2e0619e..fbcc396 100644 --- a/funding_agency.py +++ b/funding_agency.py @@ -16,7 +16,7 @@ def process_proposal(ch, method, properties, body): channel.basic_publish(exchange='', routing_key=properties.reply_to, properties=pika.BasicProperties(correlation_id=properties.correlation_id), - body=f"{acronym} approved") + body="approved") project_info=f"{acronym},{title},{description},{amount}" channel.basic_publish(exchange='', routing_key='approved_projects', body=project_info) print(f"{acronym} approved") @@ -24,7 +24,7 @@ def process_proposal(ch, method, properties, body): channel.basic_publish(exchange='', routing_key=properties.reply_to, properties=pika.BasicProperties(correlation_id=properties.correlation_id), - body=f"{acronym} rejected") + body="rejected") print(f"{acronym} rejected") channel.basic_consume(queue='research_proposals', on_message_callback=process_proposal, auto_ack=True) diff --git a/researcher.py b/researcher.py index 6c33c17..a251f29 100644 --- a/researcher.py +++ b/researcher.py @@ -1,17 +1,37 @@ import pika import uuid -connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', pika.PlainCredentials("user", "password"))) -channel = connection.channel() - -channel.queue_declare(queue='research_proposals') -def process_response(ch, method, properties, body): - if corr_id == properties.correlation_id: - print(body) - else: - print("response failed") +class ResearchProposal(object): + + def __init__(self): + self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', pika.PlainCredentials("user", "password"))) + self.channel = self.connection.channel() + result = self.channel.queue_declare(queue='', exclusive=True) + self.callback_queue = result.method.queue + self.channel.basic_consume(queue=self.callback_queue, on_message_callback=self.on_response, auto_ack=True) + self.response = None + self.corr_id = None + + + def on_response(self, ch, method, properties, body): + if self.corr_id == properties.correlation_id: + self.response = body + + def call(self, n): + self.response = None + self.corr_id = str(uuid.uuid4()) + self.channel.basic_publish(exchange='', routing_key="research_proposals", + properties=pika.BasicProperties(reply_to=self.callback_queue, + correlation_id=self.corr_id, ), + body=f"{n['acronym']},{n['title']},{n['description']},{n['amount']}") + self.connection.process_data_events(time_limit=None) + return str(self.response) + + + + proposals = [ {"acronym": "PROJ1", "title": "Project 1", "description": "Description 1", "amount": "300000"}, @@ -22,17 +42,9 @@ proposals = [ {"acronym": "PROJ6", "title": "Project 6", "description": "Description 6", "amount": "420000"} ] -corr_id = str(uuid.uuid4()) for proposal in proposals: - - response_queue = channel.queue_declare(queue='', exclusive=True).method.queue - - channel.basic_publish(exchange='', - routing_key='research_proposals', - properties=pika.BasicProperties(reply_to=response_queue, correlation_id=corr_id), - body=f"{proposal['acronym']},{proposal['title']},{proposal['description']},{proposal['amount']}") - print(f"Proposal {proposal['acronym']} sent") - channel.basic_consume(queue=response_queue, on_message_callback=process_response) - -connection.close() \ No newline at end of file + proposal_client = ResearchProposal() + print(f"{proposal['acronym']} sends proposal") + response = proposal_client.call(proposal) + print(f"{proposal['acronym']} got {response}") \ No newline at end of file