all queues now use same logger
This commit is contained in:
parent
d060d8e711
commit
5292c409ca
@ -1,13 +1,13 @@
|
|||||||
import pika
|
import pika
|
||||||
|
|
||||||
|
from functions import log
|
||||||
|
|
||||||
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', pika.PlainCredentials("user", "password")))
|
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', pika.PlainCredentials("user", "password")))
|
||||||
|
|
||||||
channel = connection.channel()
|
channel = connection.channel()
|
||||||
|
|
||||||
channel.queue_declare(queue='research_proposals')
|
channel.queue_declare(queue='research_proposals')
|
||||||
|
|
||||||
print("Waiting for proposals...")
|
|
||||||
|
|
||||||
def process_proposal(ch, method, properties, body):
|
def process_proposal(ch, method, properties, body):
|
||||||
proposal = body.decode()
|
proposal = body.decode()
|
||||||
acronym, title, description, amount = proposal.split(',')
|
acronym, title, description, amount = proposal.split(',')
|
||||||
@ -19,14 +19,19 @@ def process_proposal(ch, method, properties, body):
|
|||||||
body="approved")
|
body="approved")
|
||||||
project_info=f"{acronym},{title},{description},{amount}"
|
project_info=f"{acronym},{title},{description},{amount}"
|
||||||
channel.basic_publish(exchange='', routing_key='approved_projects', body=project_info)
|
channel.basic_publish(exchange='', routing_key='approved_projects', body=project_info)
|
||||||
print(f"{acronym} approved")
|
log(f"{acronym} approved")
|
||||||
else:
|
else:
|
||||||
channel.basic_publish(exchange='',
|
channel.basic_publish(exchange='',
|
||||||
routing_key=properties.reply_to,
|
routing_key=properties.reply_to,
|
||||||
properties=pika.BasicProperties(correlation_id=properties.correlation_id),
|
properties=pika.BasicProperties(correlation_id=properties.correlation_id),
|
||||||
body="rejected")
|
body="rejected")
|
||||||
print(f"{acronym} rejected")
|
log(f"{acronym} rejected")
|
||||||
|
|
||||||
channel.basic_consume(queue='research_proposals', on_message_callback=process_proposal, auto_ack=True)
|
channel.basic_consume(queue='research_proposals', on_message_callback=process_proposal, auto_ack=True)
|
||||||
|
|
||||||
channel.start_consuming()
|
try:
|
||||||
|
channel.start_consuming()
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
channel.stop_consuming()
|
||||||
|
connection.close()
|
||||||
|
|
||||||
|
|||||||
2
log.py
2
log.py
@ -15,10 +15,8 @@ def log(ch, method, properties, body):
|
|||||||
channel.basic_consume(queue='log', on_message_callback=log, auto_ack=True)
|
channel.basic_consume(queue='log', on_message_callback=log, auto_ack=True)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
print("Opening logs")
|
|
||||||
channel.start_consuming()
|
channel.start_consuming()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
channel.stop_consuming()
|
channel.stop_consuming()
|
||||||
channel.close()
|
channel.close()
|
||||||
print("Closing logs")
|
|
||||||
|
|
||||||
|
|||||||
@ -23,7 +23,7 @@ def get_project(ch, method, properties, body):
|
|||||||
routing_key=properties.reply_to,
|
routing_key=properties.reply_to,
|
||||||
properties=pika.BasicProperties(correlation_id=properties.correlation_id),
|
properties=pika.BasicProperties(correlation_id=properties.correlation_id),
|
||||||
body=project)
|
body=project)
|
||||||
print(f"got project {project_id}")
|
log(f"got project {project_id}")
|
||||||
|
|
||||||
|
|
||||||
def save_project(ch, method, properties, body):
|
def save_project(ch, method, properties, body):
|
||||||
|
|||||||
@ -1,7 +1,8 @@
|
|||||||
import pika
|
import pika
|
||||||
|
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
|
from functions import log
|
||||||
|
|
||||||
class ResearchProposal(object):
|
class ResearchProposal(object):
|
||||||
|
|
||||||
@ -45,6 +46,6 @@ proposals = [
|
|||||||
|
|
||||||
for proposal in proposals:
|
for proposal in proposals:
|
||||||
proposal_client = ResearchProposal()
|
proposal_client = ResearchProposal()
|
||||||
print(f"{proposal['acronym']} sends proposal")
|
log(f"{proposal['acronym']} sends proposal")
|
||||||
response = proposal_client.call(proposal)
|
response = proposal_client.call(proposal)
|
||||||
print(f"{proposal['acronym']} got {response}")
|
log(f"{proposal['acronym']} got {response}")
|
||||||
|
|||||||
@ -1,6 +1,9 @@
|
|||||||
import os
|
|
||||||
import pika
|
import pika
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
from functions import log
|
||||||
|
|
||||||
if not os.path.exists("approved_projects"):
|
if not os.path.exists("approved_projects"):
|
||||||
os.makedirs("approved_projects")
|
os.makedirs("approved_projects")
|
||||||
|
|
||||||
@ -12,20 +15,20 @@ channel.queue_declare(queue='approved_projects')
|
|||||||
def process_approved_project(ch, method, properties, body):
|
def process_approved_project(ch, method, properties, body):
|
||||||
proposal = body.decode()
|
proposal = body.decode()
|
||||||
acronym, title, description, amount = proposal.split(',')
|
acronym, title, description, amount = proposal.split(',')
|
||||||
|
|
||||||
|
|
||||||
filename = os.path.join(os.getcwd(), 'approved_projects', f'{acronym}.txt')
|
filename = os.path.join(os.getcwd(), 'approved_projects', f'{acronym}.txt')
|
||||||
with open(filename, 'w') as f:
|
with open(filename, 'w') as f:
|
||||||
f.write(f'Acronym: {acronym}\n')
|
f.write(f'Acronym: {acronym}\n')
|
||||||
f.write(f'Researcher: {description}\n')
|
f.write(f'Researcher: {description}\n')
|
||||||
f.write(f'Budget: {amount}\n')
|
f.write(f'Budget: {amount}\n')
|
||||||
|
|
||||||
print(f'Approved project received: {body}')
|
log(f'Approved project received: {body}')
|
||||||
|
|
||||||
channel.basic_consume(queue='approved_projects', on_message_callback=process_approved_project, auto_ack=True)
|
channel.basic_consume(queue='approved_projects', on_message_callback=process_approved_project, auto_ack=True)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
print('University waiting for approved projects...')
|
log('University waiting for approved projects...')
|
||||||
channel.start_consuming()
|
channel.start_consuming()
|
||||||
except KeyboardInterrupt:
|
except KeyboardInterrupt:
|
||||||
channel.stop_consuming()
|
channel.stop_consuming()
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user