-
Notifications
You must be signed in to change notification settings - Fork 0
/
FSLeafQueue.py
146 lines (107 loc) · 4.9 KB
/
FSLeafQueue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
'''
Created on Jan 12, 2015
@author: niuzhaojie
'''
from FSQueue import FSQueue
from Resources import Resources
from AppSchedulable import AppSchedulable
from policies.PolicyParser import PolicyParser
import math
class FSLeafQueue(FSQueue):
'''
classdocs
'''
def __init__(self, name, parent, scheduler):
'''
Constructor
'''
super(FSLeafQueue, self).__init__(name, parent, scheduler)
self._appScheds = []
self._demand = Resources.createResource(0, 0, 0, 0)
def addApp(self, app):
appSchedulable = AppSchedulable(self._scheduler, app, self)
app.setAppSchedulable(appSchedulable)
self._appScheds.append(appSchedulable)
def removeApp(self, app):
appSchedulableToRemove = None
for appSchedulable in self._appScheds:
if appSchedulable.getApp() == app:
appSchedulableToRemove = appSchedulable
if appSchedulableToRemove != None:
self._appScheds.remove(appSchedulableToRemove)
def getAppSchedulables(self):
return self._appScheds
def getChildQueues(self):
return []
def recomputeShares(self):
self._policy.computeShares(self._appScheds, self.getFairShare())
def getDemand(self):
return self._demand
def getResourceUsage(self):
usage = Resources.createResource(0, 0, 0, 0)
for appSchedulable in self._appScheds:
Resources.addTo(usage, appSchedulable.getResourceUsage())
return usage
def updateDemand(self):
self._demand = Resources.createResource(0, 0, 0, 0)
for appSchedulable in self._appScheds:
appSchedulable.updateDemand()
toAdd = appSchedulable.getDemand()
Resources.addTo(self._demand, toAdd)
def assignContainer(self, node):
assigned = Resources.none()
if node.getReservedContainer() != None:
return assigned
# debug
#print("node: " + node.getNodeID() + ", available resource: " + str(node.getAvailableResource()))
#for app in self._appScheds:
# print(app.getApp().getApplicationID(), str(app.getResourceUsage()), str(app.getAnyResourceRequest()), str(app.getMultiResFitness()))
# performance and fairness tradeoff
selectivity = 1 - self._scheduler.getTradeoff()
# first, sort by current policy
self._appScheds.sort(self._policy.getComparator())
'''print("fairness")
for app in self._appScheds:
print(app.getApp().getApplicationID())'''
#if selectivity > 1:
# pass
# second, filtering
end = int(min(len(self._appScheds), max(1, math.ceil(len(self._appScheds) * selectivity))))
selectedApps = self._appScheds[0 : end]
# thitd, sort selected list by fitness
multiResFitnessComparator = PolicyParser.getInstance("MRF", self._scheduler.getClusterCapacity()).getComparator()
selectedApps.sort(multiResFitnessComparator)
'''print("similarity")
for app in selectedApps:
print(app.getApp().getApplicationID())
print("\n")'''
#print("sort by fitness")
#for app in selectedApps:
# print(app.getApp().getApplicationID(), str(app.getMultiResFitness()))
#before allocation
'''for tmpNode in self._scheduler.getAllNodes():
print(tmpNode.getNodeID(), str(tmpNode.getAvailableResource()))'''
for app in selectedApps:
assigned = app.assignContainer(node)
if not Resources.equals(assigned, Resources.none()):
#print("app: " + app.getApp().getApplicationID() + ", assigned: " + str(assigned) + ", node: " + str(node) + ", fitness: " + str(app.getMultiResFitness()))
break
if Resources.equals(assigned, Resources.none()):
for app in self._appScheds[end:]:
assigned = app.assignContainer(node)
if not Resources.equals(assigned, Resources.none()):
#print("***app: " + app.getApp().getApplicationID() + ", assigned: " + str(assigned))
break
#after allocation
'''for tmpNode in self._scheduler.getAllNodes():
print(tmpNode.getNodeID(), str(tmpNode.getAvailableResource()))'''
# default implementation
'''
self._appScheds.sort(self._policy.getComparator())
for sched in self._appScheds:
assigned = sched.assignContainer(node)
if not Resources.equals(assigned, Resources.none()):
break'''
return assigned
def getAllAppSchedulables(self):
return self._appScheds