-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_node.py
More file actions
141 lines (115 loc) · 3.79 KB
/
data_node.py
File metadata and controls
141 lines (115 loc) · 3.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
from concurrent import futures
import time
import math
import logging
import grpc
import json
import route_guide_pb2
import route_guide_pb2_grpc
from search_functions import perform_search, add_documents
from argparse import ArgumentParser
import argparse
parser = argparse.ArgumentParser(description='Query Node')
parser.add_argument('--port', type=str, default="8081",
help='port for the data node')
args = parser.parse_args()
port = args.port
def decorator(func):
printer = func
def wrapped(*args):
printer(*args, end=' *\n', flush=True)
return wrapped
print = decorator(print)
class DataNode(route_guide_pb2_grpc.DataNodeServicer):
def loadData(self):
# fp = open("data_%s.json"%port, 'w+')
# self.data = json.load(fp)
# fp.close()
try:
with open("data_%s.json"%port,"r") as op:
dc = op.read()
self.data = json.loads(dc)
except Exception:
print("Dataset invalid, resetting")
self.data = []
def writeData(self):
with open("data_%s.json"%port,"w") as w:
w.write(json.dumps(self.data))
def __init__(self,query_master,query_backup):
super(DataNode,self).__init__()
self.query_master = grpc.insecure_channel(query_master)
self.query_backup = grpc.insecure_channel(query_backup)
self.loadData()
print("INIT DATA TYPE : ",type(self.data))
# self.data = []
self.mid = 0 ## To-Do : loadData()
self.commit_logs = []
self.write_doc = None
def AskQuery(self, request, context):
results = perform_search(request.query,self.data)
for res in results:
yield res
def WriteRequest(self,request,context):
try:
status = add_documents(self.commit_logs,request,self.data)
self.write_doc = {"docid":request.docid,"title":request.title,"content":request.content}
self.mid = max(self.mid, request.docid)+1
except Exception as e:
print("Exception : ",e)
if status == True:
return route_guide_pb2.Status(content="AGREED")
else:
route_guide_pb2.Status(content="ABORT")
def WriteReply(self,request,context):
if request.content == "ABORT":
## undo using logs
pass
elif request.content == "COMMIT":
print("HERE, writ doc ",self.write_doc)
if self.write_doc is not None:
print("HERE inside if")
self.data.append(self.write_doc)
self.writeData()
return route_guide_pb2.Status(content="ACK")
def DeleteRequest(self, request, context):
print("To Delete : ",request.docid)
for doc in self.data:
if doc["docid"] == request.docid:
self.to_remove = request.docid
return route_guide_pb2.Status(content="AGREED")
return route_guide_pb2.Status(content="ABORT")
def DeleteReply(self, request, context):
# print(request.docid)
if request.content=="ABORT":
pass
elif request.content == "COMMIT":
for doc in self.data:
if doc["docid"] == self.to_remove:
break
try:
self.data.remove(doc)
except Exception as e:
print("Error in deleting after commit",e)
self.writeData()
return route_guide_pb2.Status(content="ACK")
def FetchDocuments(self,request,content):
print('recvd')
for doc in self.data:
if doc["docid"]==request.docid:
return route_guide_pb2.Document(docid=doc["docid"],title=doc['title'],content=doc["content"])
return route_guide_pb2.Document(docid=-1,title="Not found",content="DOCID NOT FOUND")
def getMID(self, request, context):
return route_guide_pb2.DocumentId(docid=self.mid)
# master_ip = input("Enter Master IP : ")
# backup_ip = input("Enter Backup IP : ")
print("DATA NODE STARTED")
master_ip = "localhost:50051"
backup_ip = "localhost:50052"
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
data_node = DataNode(master_ip,backup_ip)
route_guide_pb2_grpc.add_DataNodeServicer_to_server(data_node, server)
server.add_insecure_port('[::]:%s'%(port))
server.start()
server.wait_for_termination()
serve()