Initial deployment and adaptation policy reference🔗
This section provides three indicative examples of different policies for (i) static component placement, (ii) component relocation and (iii) component specification modification. Also refer to Policy plugins doc.
Example policy-staticPlacement.py🔗
initial_plan() reads the application description and places the components to the respective hosts if specified. If the none of the components has fixed node requirements, an empty plan is returned.
""" Plugin function to implement the initial deployment logic.
"""
def initial_plan(context, app_desc, system_desc):
# Store app name and description in context
context['name'] = app_desc['name']
context['spec'] = app_desc['spec']
# Set initial plan flag to True so that analyze() can trigger plan()
if 'initial_deployment_finished' not in context:
context['initial_deployment_finished'] = True
# Store app component names
context['component_names'] = []
plan = {}
for component in app_desc['spec']['components']:
comp_name = component['metadata']['name']
logger.info('component %s', comp_name)
context['component_names'].append(comp_name)
node_placement = component.get("node_placement", None)
if node_placement:
node_name = node_placement.get("node", None)
# If node has static placement requirement, add action to plan
if node_name:
plan[comp_name] = [{'action': 'deploy', 'host': node_name}]
return plan, context
analyze() just triggers plan() at the first invocation. All the subsequent calls will return False.
async def analyze(context, application_description, system_description, mechanisms, telemetry, ml_connector):
# Retrieve the first application from the list
application = application_description[0]
adaptation = False
# The first time that analyze is called, set flag to True
if 'initial_deployment_finished' not in context:
logger.info('initial deployment not finished')
adaptation = True
return adaptation, context
initial_plan() has returned the first plan, return it to Fluidity for execution. Otherwise, no adaptation occurs (empty plan). async def plan(context, application_description, system_description, mechanisms, telemetry, ml_connector):
plan_result = {}
plan_result['deployment_plan'] = {}
application = application_description[0]
description_changed = False
if 'initial_deployment_finished' not in context:
initial_plan_result, new_context = initial_plan(context, application, system_description)
if initial_plan_result:
plan_result['deployment_plan'] = initial_plan_result
plan_result['deployment_plan']['initial_plan'] = True
if plan_result['deployment_plan']:
plan_result['name'] = context['name']
new_plan = {
"fluidity": plan_result
}
logger.info('plan: New plan %s', new_plan)
return new_plan, context
Example policy-relocateComponents.py🔗
This policy relocates (deploys a new component instance on a host and removes the old one), for demo purposes, based on custom logic that invokes plan() with configurable frequency. This happens only for components that do not have strict placement requirements (node is not specified via the app description).
Setup the initial context of the policy using initialize() function.
def initialize():
print(f"Initializing policy {inspect.stack()[1].filename}")
initialContext = {
"telemetry": {
"metrics": ["node_load1"],
"system_scrape_interval": "5s"
},
"mechanisms": [
"fluidity_proxy"
],
"packages": [],
"configuration": {
"analyze_interval": "10s"
},
"latest_timestamp": None,
"core": False,
"scope": "application",
"current_placement": None,
"initial_deployment_finished": False,
"moving_interval": "30s",
"dynamic_placement_comp": None
}
return initialContext
parse_analyze_interval() converts the key stored in context to seconds in order to manually set the frequency of plan() invocation.
def parse_analyze_interval(interval: str) -> int:
"""
Parses an analyze interval string in the format 'Xs|Xm|Xh|Xd' and converts it to seconds.
Args:
interval (str): The analyze interval as a string (e.g., "5m", "2h", "1d").
Returns:
int: The interval in seconds.
Raises:
ValueError: If the format of the interval string is invalid.
"""
# Match the string using a regex: an integer followed by one of s/m/h/d
match = re.fullmatch(r"(\d+)([smhd])", interval)
if not match:
raise ValueError(f"Invalid analyze interval format: '{interval}'")
# Extract the numeric value and the time unit
value, unit = int(match.group(1)), match.group(2)
# Convert to seconds based on the unit
if unit == "s": # Seconds
return value
elif unit == "m": # Minutes
return value * 60
elif unit == "h": # Hours
return value * 60 * 60
elif unit == "d": # Days
return value * 24 * 60 * 60
else:
raise ValueError(f"Unsupported time unit '{unit}' in interval: '{interval}'")
initial_plan() checks for components without fixed placement requirements and produces the initial deployment plan.
""" Plugin function to implement the initial deployment logic.
"""
def initial_plan(context, app_desc, system_description):
logger.info('initial deployment phase ', app_desc)
context['name'] = app_desc['name']
context['spec'] = app_desc['spec']
context['initial_deployment_finished'] = True
context['component_names'] = []
plan = {}
# Random host selection to relocate between two nodes of the cluster
context['main_node'] = system_description['MLSysOpsCluster']['nodes'][0]
context['alternative_node'] = system_description['MLSysOpsCluster']['nodes'][1]
# Retrieve the first node of the node list.
context["current_placement"] = system_description['MLSysOpsCluster']['nodes'][0]
for component in app_desc['spec']['components']:
comp_name = component['metadata']['name']
logger.info('component %s', comp_name)
context['component_names'].append(comp_name)
node_placement = component.get("node_placement")
if node_placement:
node_name = node_placement.get("node", None)
if node_name:
logger.info('Found node name. Will continue')
continue
context['dynamic_placement_comp'] = comp_name
plan[comp_name] = [{'action': 'deploy', 'host': context["current_placement"]}]
logger.info('Initial plan %s', plan)
return plan, context
analyze() periodically triggers adaptation based on manual configuration in context['moving_interval'].
async def analyze(context, application_description, system_description, mechanisms, telemetry, ml_connector):
logger.info(f"\nTelemetry {telemetry}")
current_timestamp = time.time()
# The first time called
if context['latest_timestamp'] is None:
context['latest_timestamp'] = current_timestamp
return True, context
# All the next ones, get it
analyze_interval = parse_analyze_interval(context['moving_interval'])
if current_timestamp - context['latest_timestamp'] > analyze_interval:
context['latest_timestamp'] = current_timestamp
return True, context
return False, context
plan() checks the current host and relocates to the other node.
async def plan(context, application_description, system_description, mechanisms, telemetry, ml_connector):
#logger.info(f"Called relocation plan ----- {mechanisms}")
context['initial_plan'] = False
plan_result = {}
plan_result['deployment_plan'] = {}
application = application_description[0]
if 'initial_deployment_finished' in context and context['initial_deployment_finished'] == False:
initial_plan_result, new_context = initial_plan(context, application, system_description)
if initial_plan_result:
plan_result['deployment_plan'] = initial_plan_result
plan_result['deployment_plan']['initial_plan'] = True
comp_name = new_context['dynamic_placement_comp']
else:
comp_name = context['dynamic_placement_comp']
plan_result['deployment_plan']['initial_plan'] = False
plan_result['deployment_plan'][comp_name] = []
curr_plan = {}
if context['main_node'] == context["current_placement"]:
curr_plan = {
"action": "move",
"target_host": context['alternative_node'],
"src_host": context['main_node'],
}
context["current_placement"] = context['alternative_node']
elif context['alternative_node'] == context["current_placement"]:
curr_plan = {
"action": "move",
"target_host": context['main_node'],
"src_host": context['alternative_node'],
}
context["current_placement"] = context['main_node']
plan_result['deployment_plan'][comp_name].append(curr_plan)
if plan_result:
plan_result['name'] = context['name']
new_plan = {
"fluidity": plan_result,
}
logger.info('plan: New plan %s', new_plan)
return new_plan, context
Example policy-changeCompSpec.py🔗
This policy performs component specification change at runtime. We showcase 3 different changes based on: (i) Kubernetes Pod runtime class name. (ii) Container image used. (iii) Pod resource requirements (cpu and memory).
spec_changes = cycle([
{'runtime_class_name': cycle(['crun', 'nvidia'])},
{'image': cycle(['harbor.nbfc.io/mlsysops/test-app:sha-90e0077', 'harbor.nbfc.io/mlsysops/test-app:latest'])},
{'platform_requirements': {
'cpu': {
'requests': '', # in m
'limits': '' # in m
},
'memory': {
'requests': '', # in Mi
'limits': '' # in Mi
}
}
}
])
def initialize():
print(f"Initializing policy {inspect.stack()[1].filename}")
initialContext = {
"telemetry": {
"metrics": ["node_load1"],
"system_scrape_interval": "1s"
},
"mechanisms": [
"fluidity_proxy"
],
"packages": [],
"configuration": {
"analyze_interval": "30s"
},
"latest_timestamp": None,
"core": False,
"scope": "application",
"curr_comp_idx": 0,
"current_placement": None,
"initial_deployment_finished": False,
"moving_interval": "30s",
"dynamic_placement_comp": None
}
return initialContext
async def analyze(context, application_description, system_description, mechanisms, telemetry, ml_connector):
current_timestamp = time.time()
# The first time called
if context['latest_timestamp'] is None:
context['latest_timestamp'] = current_timestamp
return True, context
# All the next ones, get it
analyze_interval = parse_analyze_interval(context['moving_interval'])
logger.info(f"{current_timestamp} - {context['latest_timestamp']} = {current_timestamp - context['latest_timestamp']} with interval {analyze_interval}")
if current_timestamp - context['latest_timestamp'] > analyze_interval:
context['latest_timestamp'] = current_timestamp
return True, context
return True, context
plan() selects one of the available changes in the component spec (round-robin). We show change between 2 runtime class names, 2 container images and random component resource requirements.
async def plan(context, application_description, system_description, mechanisms, telemetry, ml_connector):
plan_result = {}
plan_result['deployment_plan'] = {}
application = application_description[0]
description_changed = False
change_idx = cycle([0, 1, 2])
curr_change = next(spec_changes)
cpu_suffix = 'm'
mem_suffix = 'Mi'
# Get the first component just for demo purposes
component = application['spec']['components'][0]
comp_name = component['metadata']['name']
logger.info(f'component spec {component}')
# If the component has fixed node placement requirement find the host
# else select the first node
if 'node_placement' in component and 'node' in component['node_placement']:
node = component['node_placement']['node']
logger.info(f'Found static placement on {node} for comp {comp_name}')
else:
node = system_description['MLSysOpsCluster']['nodes'][0]
logger.info(f'Randomly select host {node} for {comp_name}')
plan_result['deployment_plan'][comp_name] = []
for key in curr_change:
if key == 'runtime_class_name':
component[key] = next(curr_change[key])
else:
for container in component['containers']:
if key == 'image':
# Find the next image to be used and continue
container[key] = next(curr_change[key])
continue
# Set random cpu/mem requirements for the component
request_cpu = str(random.randint(0, 300))
limit_cpu = str(random.randint(301, 400))
request_mem = str(random.randint(0, 300))
limit_mem = str(random.randint(301, 400))
logger.info(f'request_cpu+cpu_suffix {request_cpu+cpu_suffix}')
if key not in container or 'cpu' not in container[key] or 'memory' not in container[key]:
container[key] = {
'cpu': {
'requests': '',
'limits': ''
},
'memory': {
'requests': '',
'limits': ''
}
}
container[key]['cpu']['requests'] = request_cpu+cpu_suffix
container[key]['cpu']['limits'] = limit_cpu+cpu_suffix
container[key]['memory']['requests'] = request_mem+mem_suffix
container[key]['memory']['limits'] = limit_mem+mem_suffix
plan_result['deployment_plan'][comp_name].append({'action': 'change_spec', 'new_spec': component, 'host': node})
logger.info(f"Applying change type {key} to comp {comp_name}, new spec is {component}")
# If there is a produced plan, extend the plan accordingly with the application name and initial_plan flag
if plan_result:
plan_result['name'] = application['name']
# This policy will only take effect after initial deployment is done.
plan_result['deployment_plan']['initial_plan'] = False
new_plan = {
"fluidity": plan_result
}
logger.info('plan: New plan %s', new_plan)
return new_plan, context