#!/usr/bin/env python3
# OpenPOWER Automated Test Project
#
# Contributors Listed Below - COPYRIGHT 2017
# [+] International Business Machines Corp.
#
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
import re
import sys
import time
import datetime
import pexpect
import subprocess
import json
import requests
import cgi
import os
from .OpTestSSH import OpTestSSH
from .OpTestBMC import OpTestBMC
from .Exceptions import HTTPCheck
from .Exceptions import CommandFailed
from .OpTestConstants import OpTestConstants as BMC_CONST
from . import OpTestSystem
import logging
import OpTestLogger
log = OpTestLogger.optest_logger_glob.get_logger(__name__)
[docs]class HostManagement():
'''
HostManagement Class
OpenBMC methods to manage the Host
'''
def __init__(self, conf=None,
ip=None,
username=None,
password=None):
self.conf = conf
self.util = conf.util
self.hostname = ip
self.username = username
self.password = password
self.util.PingFunc(
self.hostname, totalSleepTime=BMC_CONST.PING_RETRY_FOR_STABILITY)
if self.conf.util_bmc_server is None:
self.conf.util.setup(config='REST')
r = self.conf.util_bmc_server.login()
self.wait_for_bmc_runtime()
[docs] def get_inventory(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Inventory Enumerate
GET
https://bmcip/xyz/openbmc_project/inventory/enumerate
'''
uri = "/xyz/openbmc_project/inventory/enumerate"
r = self.conf.util_bmc_server.get(uri=uri, minutes=minutes)
return r.json().get('data')
[docs] def sensors(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Sensor Enumerate
GET
https://bmcip/xyz/openbmc_project/sensors/enumerate
'''
uri = "/xyz/openbmc_project/sensors/enumerate"
r = self.conf.util_bmc_server.get(uri=uri, minutes=minutes)
return r.json().get('data')
[docs] def get_power_state(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Get Current Host Power State
GET
https://bmcip/xyz/openbmc_project/state/host0/attr/CurrentHost
'''
uri = '/xyz/openbmc_project/state/host0/attr/CurrentHostState'
r = self.conf.util_bmc_server.get(uri=uri, minutes=minutes)
return r.json().get('data')
[docs] def get_host_state(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Get Host State
GET
https://bmcip/xyz/openbmc_project/state/host0/attr/CurrentHostState
'''
uri = '/xyz/openbmc_project/state/host0/attr/CurrentHostState'
r = self.conf.util_bmc_server.get(uri=uri, minutes=minutes)
return r.json().get('data')
[docs] def soft_reboot(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Reboot Server Gracefully
PUT
https://bmcip/xyz/openbmc_project/state/host0/attr/RequestedHostTransition
"data": "xyz.openbmc_project.State.Host.Transition.Reboot"
'''
uri = "/xyz/openbmc_project/state/host0/attr/RequestedHostTransition"
payload = {"data": "xyz.openbmc_project.State.Host.Transition.Reboot"}
r = self.conf.util_bmc_server.put(
uri=uri, json=payload, minutes=minutes)
[docs] def hard_reboot(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Reboot Server Immediately
PUT
https://bmcip/xyz/openbmc_project/state/host0/attr/RequestedHostTransition
"data": "xyz.openbmc_project.State.Host.Transition.Reboot"
'''
uri = "/xyz/openbmc_project/state/host0/attr/RequestedHostTransition"
payload = {"data": "xyz.openbmc_project.State.Host.Transition.Reboot"}
r = self.conf.util_bmc_server.put(
uri=uri, json=payload, minutes=minutes)
[docs] def power_soft(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Power Soft Server
PUT
https://bmcip/xyz/openbmc_project/state/chassis0/attr/RequestedPowerTransition
"data": "xyz.openbmc_project.State.Chassis.Transition.Off"
'''
uri = "xyz/openbmc_project/state/chassis0/attr/RequestedPowerTransition"
payload = {"data": "xyz.openbmc_project.State.Chassis.Transition.Off"}
r = self.conf.util_bmc_server.put(
uri=uri, json=payload, minutes=minutes)
[docs] def power_off(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Power Off Server
PUT
https://bmcip/xyz/openbmc_project/state/chassis0/attr/RequestedPowerTransition
"data": "xyz.openbmc_project.State.Chassis.Transition.Off"
'''
uri = "/xyz/openbmc_project/state/chassis0/attr/RequestedPowerTransition"
payload = {"data": "xyz.openbmc_project.State.Chassis.Transition.Off"}
r = self.conf.util_bmc_server.put(
uri=uri, json=payload, minutes=minutes)
[docs] def power_on(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Power On Server
PUT
https://bmcip/xyz/openbmc_project/state/host0/attr/RequestedHostTransition
"data": "xyz.openbmc_project.State.Host.Transition.On"
'''
uri = "/xyz/openbmc_project/state/host0/attr/RequestedHostTransition"
payload = {"data": "xyz.openbmc_project.State.Host.Transition.On"}
r = self.conf.util_bmc_server.put(
uri=uri, json=payload, minutes=minutes)
[docs] def list_sel(self, minutes=BMC_CONST.HTTP_RETRY):
'''
List SEL
GET
https://bmcip/xyz/openbmc_project/logging/enumerate
"data" : []
'''
log.debug("List of SEL entries")
payload = {"data": []}
uri = "/xyz/openbmc_project/logging/enumerate"
r = self.conf.util_bmc_server.get(
uri=uri, json=payload, minutes=minutes)
return r.json()
[docs] def pull_ids(self, sels=None):
'''
Pull SEL IDs
'''
id_list = []
sel_dict = {}
for key in sels:
m = re.match(r"/xyz/openbmc_project/logging/entry/(\d{1,})$", key)
if m:
id_list.append(str(sels.get(key).get('Id')))
sel_dict[str(sels.get(key).get('Id'))] = key
id_list.sort()
# sample id_list ['81', '82', '83', '84']
return id_list, sel_dict
[docs] def get_sel_ids(self, dump=False, minutes=BMC_CONST.HTTP_RETRY):
'''
Build a sorted id_list from the SELs and also
build a list of dictionary items containing the
SEL and ESEL information
:param dump: Set to True if a printed output is desired.
:type dump: Boolean
'''
sels = []
dict_list = []
json_data = self.list_sel(minutes=minutes)
log.debug("json_data={}".format(json_data))
id_list, sel_dict = self.pull_ids(sels=json_data['data'])
# sample sel_dict.get(j)=/xyz/openbmc_project/logging/entry/78
for j in id_list:
dict_item = {}
dict_item['Id'] = str(json_data['data'][sel_dict.get(j)].get('Id'))
dict_item['Timestamp'] = datetime.datetime.fromtimestamp(int(str(
json_data['data'][sel_dict.get(j)].get('Timestamp')))/1000).strftime("%Y-%m-%d %H:%M:%S")
dict_item['Message'] = json_data['data'][sel_dict.get(j)].get(
'Message')
dict_item['Description'] = json_data['data'][sel_dict.get(j)].get(
'Description')
dict_item['Severity'] = json_data['data'][sel_dict.get(j)].get(
'Severity').split('.')[-1]
dict_item['Resolved'] = json_data['data'][sel_dict.get(j)].get(
'Resolved')
dict_item['EventID'] = json_data['data'][sel_dict.get(j)].get(
'EventID')
add_data = json_data['data'][sel_dict.get(j)]['AdditionalData']
for i in range(len(add_data)):
if ("ESEL" in add_data[i]):
dict_item['esel'] = add_data[i].strip().split('=')[
1].replace(" ", "")
if ("PROCEDURE" in add_data[i]):
dict_item['Procedure'] = str(add_data[i].split('=')[1])
dict_list.append(dict_item)
# we leave this to print to stdout to allow piping to appropriate logfile or console
# TODO: When PR #361 merges collapse this and convert using OpTestUtil methods
if dump:
print(
"\n----------------------------------------------------------------------")
print("SELs")
print(
"----------------------------------------------------------------------")
if len(id_list) == 0:
print("SEL has no entries")
for k in dict_list:
print(("Id : {}".format(k.get('Id'))))
print(("Message : {}".format(k.get('Message'))))
print(("Description : {}".format(k.get('Description'))))
print(("Timestamp : {}".format(k.get('Timestamp'))))
print(("Severity : {}".format(k.get('Severity'))))
print(("Resolved : {}".format(k.get('Resolved'))))
if k.get('EventID') is not None:
print(("EventID : {}".format(k.get('EventID'))))
if k.get('Procedure') is not None:
print(("Procedure : {}".format(k.get('Procedure'))))
if k.get('esel') is not None:
print(("ESEL : characters={}\n".format(
len(k.get('esel')))))
print(
"Ruler : 0123456789012345678901234567890123456789012345678901234567890123")
print(
"-------------------------------------------------------------------------------")
for j in range(0, len(k.get('esel')), 64):
print(("{:06d}-{:06d}: {}".format(j,
j+63, k.get('esel')[j:j+64])))
else:
print("ESEL : None")
print("\n")
print(
"----------------------------------------------------------------------")
# sample id_list ['81', '82', '83', '84']
log.debug("id_list={}".format(id_list))
log.debug("dict_list={}".format(dict_list))
return id_list, dict_list
[docs] def convert_esels_to_list(self, id_list=None, dict_list=None):
esel_list = []
esel_list.append("\n----------------------------------------------------------------------")
esel_list.append("SELs")
esel_list.append("----------------------------------------------------------------------")
if len(id_list) == 0:
esel_list.append("SEL has no entries")
for k in dict_list:
esel_list.append("Id : {}".format(k.get('Id')))
esel_list.append("Message : {}".format(k.get('Message')))
esel_list.append("Description : {}".format(k.get('Description')))
esel_list.append("Timestamp : {}".format(k.get('Timestamp')))
esel_list.append("Severity : {}".format(k.get('Severity')))
esel_list.append("Resolved : {}".format(k.get('Resolved')))
if k.get('EventID') is not None:
esel_list.append("EventID : {}".format(k.get('EventID')))
if k.get('Procedure') is not None:
esel_list.append("Procedure : {}".format(k.get('Procedure')))
if k.get('esel') is not None:
esel_list.append("ESEL : characters={}\n".format(len(k.get('esel'))))
esel_list.append("Ruler : 0123456789012345678901234567890123456789012345678901234567890123")
esel_list.append("-------------------------------------------------------------------------------")
for j in range(0, len(k.get('esel')), 64):
esel_list.append("{:06d}-{:06d}: {}".format(j, j+63, k.get('esel')[j:j+64]))
else:
esel_list.append("ESEL : None")
esel_list.append("\n")
esel_list.append("----------------------------------------------------------------------")
return esel_list
[docs] def clear_sel_by_id(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Clear SEL by ID
POST
https://bmcip/xyz/openbmc_project/logging/entry/<id>/action/Delete
"data" : []
'''
log.debug("Clearing SEL entries by id")
list, dict_list = self.get_sel_ids(minutes=minutes)
log.debug("list={}".format(list))
log.debug("dict_list={}".format(dict_list))
for id in list:
uri = "/xyz/openbmc_project/logging/entry/{}/action/Delete".format(
id)
payload = {"data": []}
r = self.conf.util_bmc_server.post(
uri=uri, json=payload, minutes=minutes)
[docs] def verify_clear_sel(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Verify SEL is Cleared
'''
log.debug("Check if SEL really has zero entries")
list = []
list, dict_list = self.get_sel_ids(minutes=minutes)
log.debug("list={}".format(list))
log.debug("dict_list={}".format(dict_list))
if not list:
return True
return False
[docs] def clear_sel(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Clear ALL SELs
POST
https://bmcip/xyz/openbmc_project/logging/action/DeleteAll
"data" : []
'''
log.debug("Clearing ALL SELs DeleteAll")
uri = "/xyz/openbmc_project/logging/action/DeleteAll"
payload = {"data": []}
r = self.conf.util_bmc_server.post(
uri=uri, json=payload, minutes=minutes)
[docs] def get_current_bootdev(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Current Boot Device Info
GET
https://bmcip/xyz/openbmc_project/control/host0/boot/attr/BootMode
'''
uri = "/xyz/openbmc_project/control/host0/boot/attr/BootMode"
r = self.conf.util_bmc_server.get(uri=uri, minutes=minutes)
json_data = r.json().get('data')
bootmode = ""
if "Setup" in json_data:
bootmode = "Setup"
elif "Regular" in json_data:
bootmode = "Regular"
return bootmode
[docs] def set_bootdev_to_setup(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Set boot device to setup
PUT
https://bmcip/xyz/openbmc_project/control/host0/boot/attr/BootMode
"data": "xyz.openbmc_project.Control.Boot.Mode.Modes.Setup"
https://bmcip/xyz/openbmc_project/control/host0/boot/one_time/attr/Enabled
"data": 0
'''
uri = "/xyz/openbmc_project/control/host0/boot/attr/BootMode"
payload = {"data": "xyz.openbmc_project.Control.Boot.Mode.Modes.Setup"}
r = self.conf.util_bmc_server.put(
uri=uri, json=payload, minutes=minutes)
uri = "/xyz/openbmc_project/control/host0/boot/one_time/attr/Enabled"
payload = {"data": 0}
r = self.conf.util_bmc_server.put(
uri=uri, json=payload, minutes=minutes)
[docs] def set_bootdev_to_none(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Set boot device to regular/default
PUT
https://bmcip/xyz/openbmc_project/control/host0/boot/attr/BootMode
"data": "xyz.openbmc_project.Control.Boot.Mode.Modes.Regular"
https://bmcip/xyz/openbmc_project/control/host0/boot/one_time/attr/Enabled
"data": 0
'''
uri = "/xyz/openbmc_project/control/host0/boot/attr/BootMode"
payload = {"data": "xyz.openbmc_project.Control.Boot.Mode.Modes.Regular"}
r = self.conf.util_bmc_server.put(
uri=uri, json=payload, minutes=minutes)
uri = "/xyz/openbmc_project/control/host0/boot/one_time/attr/Enabled"
payload = {"data": 0}
r = self.conf.util_bmc_server.put(
uri=uri, json=payload, minutes=minutes)
[docs] def get_boot_progress(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Boot Progress Info
GET
https://bmcip//xyz/openbmc_project/state/host0/attr/BootProgress
'''
uri = "/xyz/openbmc_project/state/host0/attr/BootProgress"
r = self.conf.util_bmc_server.get(uri=uri, minutes=minutes)
return r.json().get('data')
[docs] def wait_for_host_state(self, target_state, host=0, minutes=10):
'''
Wait for OpenBMC Host state
GET
https://bmcip//xyz/openbmc_project/state/host0
"data" : []
'''
status = self.wait_bmc(token="/xyz/openbmc_project/state/host{}".format(host),
key="CurrentHostState",
value_target="xyz.openbmc_project.State.Host.HostState.{}".format(
target_state),
minutes=minutes)
return status
[docs] def wait_for_standby(self, timeout=10):
'''
Wait for Standby
'''
r = self.wait_for_host_state("Off", minutes=timeout)
[docs] def wait_for_runtime(self, timeout=10):
'''
Wait for Runtime
'''
r = self.wait_for_host_state("Running", minutes=timeout)
[docs] def bmc_reset(self, minutes=BMC_CONST.HTTP_RETRY):
'''
BMC reset
PUT
https://bmcip/xyz/openbmc_project/state/bmc0/attr/RequestedBMCTransition
"data" : "xyz.openbmc_project.State.BMC.Transition.Reboot"
'''
uri = "/xyz/openbmc_project/state/bmc0/attr/RequestedBMCTransition"
payload = {"data": "xyz.openbmc_project.State.BMC.Transition.Reboot"}
r = self.conf.util_bmc_server.put(
uri=uri, json=payload, minutes=minutes)
# Wait for BMC to go down.
self.util.ping_fail_check(self.hostname)
# Wait for BMC to ping back.
self.util.PingFunc(
self.hostname, totalSleepTime=BMC_CONST.PING_RETRY_POWERCYCLE)
# Wait for BMC ready state.
self.wait_for_bmc_runtime()
[docs] def get_bmc_state(self):
'''
Get BMC State
'''
# caller drives retry
uri = "/xyz/openbmc_project/state/bmc0/attr/CurrentBMCState"
r = self.conf.util_bmc_server.get(uri=uri)
if r.status_code != requests.codes.ok:
problem = "[{}] Description={}".format(r.json().get(
'message'), "".join(r.json().get('data').get('description')))
raise HTTPCheck(
message="HTTP problem getting CurrentBMCState {}".format(problem))
return r.json().get('data')
[docs] def wait_bmc(self, key=None, value_target=None, token=None, minutes=10):
'''
Wait on BMC
Given a token, target, key wait for a match
'''
# handles data as a dictionary or string
timeout = time.time() + 60*minutes
while True:
r = self.conf.util_bmc_server.get(uri=token, minutes=minutes)
if type(r.json().get('data')) == type(dict()):
if key is None:
for key, value in r.json().get('data'):
if r.json().get('data').get(key) == value_target:
break
else:
if r.json().get('data').get(key) == value_target:
break
else:
if value_target in r.json().get('data'):
break
if time.time() > timeout:
log.warning("We timed out waiting for \"{}\", we waited {} minutes for \"{}\"".format(
token, minutes, value_target))
raise HTTPCheck(message="HTTP problem getting \"{}\", we waited {} minutes for \"{}\"".format(
token, minutes, value_target))
time.sleep(5)
return True
[docs] def wait_for_bmc_runtime(self, timeout=10):
'''
Wait for BMC Runtime
'''
status = self.wait_bmc(token="/xyz/openbmc_project/state/bmc0/attr/CurrentBMCState",
value_target="xyz.openbmc_project.State.BMC.BMCState.Ready",
minutes=timeout)
return status
[docs] def get_list_of_image_ids(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Get list of image IDs
GET
https://bmcip/xyz/openbmc_project/software
'''
uri = "/xyz/openbmc_project/software/"
r = self.conf.util_bmc_server.get(uri=uri, minutes=minutes)
log.debug("Image IDs={}".format(r.json()))
ids = []
for k in r.json().get('data'):
log.debug("Image Data Info k={}".format(k))
m = re.match(r'/xyz/openbmc_project/software/(.*)', k)
# According to the OpenBMC docs, Image ID can be
# Implementation defined, and thus, the word 'active'
# would be a valid ID.
# Except that it isn't. The documentation is lies.
# It seems that 'active' is special.
# There is no documentation as to what other
# strings are special, or could become special.
# So, HACK HACK HACK around it. :(
# https://github.com/openbmc/phosphor-dbus-interfaces/tree/55d03ca/xyz/openbmc_project/Software#image-identifier
# This is getting insane, it's not just 'active'
# It's 'functional' as well. Or some other things.
# So, we assume that if we don't have 'Purpose' it's something special
# like the 'active' or (new) 'functional'.
# Adriana has promised me that this is safe into the future.
if m:
log.debug("Image Data Info ID={}: {}".format(m.group(1), k))
i = self.image_data(m.group(1))
if i['data'].get('Purpose') is not None:
ids.append(m.group(1))
log.debug("List of images id's: {}".format(ids))
return ids
[docs] def image_data(self, id, minutes=BMC_CONST.HTTP_RETRY):
'''
Get Image Info By ID
GET
https://bmcip/xyz/openbmc_project/software/id
'''
uri = "/xyz/openbmc_project/software/{}".format(id)
r = self.conf.util_bmc_server.get(uri=uri, minutes=minutes)
log.debug("Image ID={} Data={}".format(id, r.json()))
return r.json()
[docs] def upload_image(self, image, minutes=BMC_CONST.HTTP_RETRY):
'''
Upload an image, OpenBMC imposes validation on files uploaded
POST
https://bmcip/upload/image
"file" : file-like-object
'''
with open(image, 'rb') as fileload:
uri = "/upload/image"
octet_hdr = {'Content-Type': 'application/octet-stream'}
r = self.conf.util_bmc_server.post(uri=uri,
headers=octet_hdr,
data=fileload)
if r.status_code != 200:
print((r.headers))
print((r.text))
print(r)
[docs] def get_image_priority(self, id, minutes=BMC_CONST.HTTP_RETRY):
'''
Get Image Priority - 0=primary, boot side of the image
'''
json_data = self.image_data(id, minutes=minutes)
log.debug("Image Data: {}".format(json_data))
# we use the strings below for easy grepping of debug logs
log.debug("Image Data Info ID: {}".format(id))
log.debug("Image Data Info Priority: {}".format(json_data.get('data').get('Priority')))
log.debug("Image Data Info Purpose: {}".format(json_data.get('data').get('Purpose')))
log.debug("Image Data Info Version: {}".format(json_data.get('data').get('Version')))
return json_data.get('data').get('Priority')
[docs] def set_image_priority(self, id, level, minutes=BMC_CONST.HTTP_RETRY):
'''
Set Image Priority
PUT
https://bmcip/xyz/openbmc_project/software/<id>/attr/Priority
"data" : 0
'''
# xyz.openbmc_project.Software.RedundancyPriority Priority y 0
uri = "/xyz/openbmc_project/software/{}/attr/Priority".format(id)
payload = {"data" : int(level)}
r = self.conf.util_bmc_server.put(
uri=uri, json=payload, minutes=minutes)
[docs] def image_ready_for_activation(self, id, timeout=10):
'''
Image Activation Ready
IS THIS USED ? CAN IT BE REMOVED ?
'''
timeout = time.time() + 60*timeout
while True:
json_data = self.image_data(id, minutes=BMC_CONST.HTTP_RETRY)
log.debug("Image JSON : {}".format(json_data))
if json_data.get('data').get('Activation') \
== "xyz.openbmc_project.Software.Activation.Activations.Ready":
log.debug("Image upload is successful & Ready for activation")
break
if time.time() > timeout:
raise HTTPCheck(
message="Image is not ready for activation/Timeout happened")
time.sleep(5)
return True
[docs] def activate_image(self, id, minutes=BMC_CONST.HTTP_RETRY):
'''
Activate An Image
PUT
https://bmcip/xyz/openbmc_project/software/<image id>/attr/RequestedActivation
"data": "xyz.openbmc_project.Software.Activation.RequestedActivations.Active"
'''
uri = "/xyz/openbmc_project/software/{}/attr/RequestedActivation".format(
id)
payload = {
"data": "xyz.openbmc_project.Software.Activation.RequestedActivations.Active"}
r = self.conf.util_bmc_server.put(
uri=uri, json=payload, minutes=minutes)
[docs] def delete_image(self, id, minutes=BMC_CONST.HTTP_RETRY):
'''
Delete An Image
POST
https://bmcip/xyz/openbmc_project/software/<id>/action/delete
"data" : []
'''
try:
# First, we try the "new" method, as of at least ibm-v2.0-0-r26.1-0-gfb7714a
uri = "/xyz/openbmc_project/software/{}/action/delete".format(id)
payload = {"data": []}
r = self.conf.util_bmc_server.post(
uri=uri, json=payload, minutes=minutes)
except Exception as e:
# Try falling back to the old method (everything prior? who knows)
uri = "/xyz/openbmc_project/software/{}".format(id)
payload = {"data": []}
r = self.conf.util_bmc_server.delete(
uri=uri, json=payload, minutes=minutes)
[docs] def wait_for_image_active_complete(self, id, timeout=10):
'''
Wait For Image Active Complete
'''
timeout = time.time() + 60*timeout
while True:
json_data = self.image_data(id, minutes=BMC_CONST.HTTP_RETRY)
if json_data.get('data').get('Activation') \
== "xyz.openbmc_project.Software.Activation.Activations.Activating":
log.info("Image activation is in progress")
if json_data.get('data').get('Activation') \
== "xyz.openbmc_project.Software.Activation.Activations.Active":
log.info(
"Image activated successfully, Good to go for power on....")
break
if json_data.get('data').get('Activation') \
== "xyz.openbmc_project.Software.Activation.Activations.Failed":
log.error("Image activation failed. Try --run testcases.testRestAPI.HostOff.test_field_mode_enable_disable, which leaves field mode disabled.")
log.warning("Bug reported SW461922 : OP930:FVT: Software Field mode disable operation does not throw error or warning, future this will not work")
log.warning("You need to factory reset the BMC which requires re-setting BMC IP's (BMC serial connection needed due to loss of networking")
log.warning("Alternate methods are via BMC busctl commands")
return False
if time.time() > timeout:
raise HTTPCheck(
message="Image is failed to activate/Timeout happened")
time.sleep(5)
return True
[docs] def host_image_ids(self):
'''
Get List of Host Image IDs
'''
return self.image_ids(purpose='xyz.openbmc_project.Software.Version.VersionPurpose.Host')
[docs] def bmc_image_ids(self):
'''
Get List of BMC Image IDs
'''
return self.image_ids(purpose='xyz.openbmc_project.Software.Version.VersionPurpose.BMC')
[docs] def image_ids(self, purpose=None, minutes=BMC_CONST.HTTP_RETRY):
'''
Get List of Image IDs
'''
l = self.get_list_of_image_ids(minutes=minutes)
for id in l[:]:
i = self.image_data(id, minutes=minutes)
# Here, we assume that if we don't have 'Purpose' it's something special
# like the 'active' or (new) 'functional'.
# Adriana has promised me that this is safe.
log.debug("Image Data Info ID: {}".format(i))
if i['data'].get('Purpose') != purpose:
log.debug("Removing Image ID: {} "
"Purpose={} does not match purpose={}"
.format(id, i.get('data').get('Purpose'), purpose))
l.remove(id)
log.debug("ALL Image IDs: {} purpose={}".format(l, purpose))
return l
[docs] def list_available_dumps(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Listing available Dumps:
GET
https://bmcip/xyz/openbmc_project/dump/list
'''
uri = "/xyz/openbmc_project/dump/list"
r = self.conf.util_bmc_server.get(uri=uri, minutes=minutes)
return r
[docs] def get_dump_ids(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Get Dump IDs
'''
dump_ids = []
r = self.list_available_dumps(minutes=BMC_CONST.HTTP_RETRY)
log.debug("Dump IDs: {}".format(r.json()))
for k in r.json().get('data'):
log.debug("k={}".format(k))
m = re.match(r"/xyz/openbmc_project/dump/entry/(\d{1,})$", k)
if m:
dump_ids.append(m.group(1))
log.debug("Dump IDs: {}".format(dump_ids))
return dump_ids
[docs] def download_dump(self, dump_id, minutes=BMC_CONST.HTTP_RETRY):
'''
Download Dump
GET
https://bmcip/download/dump/<id>
'''
uri = "/download/dump/{}".format(dump_id)
r = self.conf.util_bmc_server.get(
uri=uri, stream=True, minutes=minutes)
value, params = cgi.parse_header(r.headers.get('Content-Disposition'))
with open(os.path.join(self.conf.logdir, params.get('filename')), 'wb') as f:
f.write(r.content)
[docs] def delete_dump(self, dump_id, minutes=BMC_CONST.HTTP_RETRY):
'''
Delete dump
POST
https://bmcip/xyz/openbmc_project/dump/entry/<dump_id>/action/Delete
'''
uri = "/xyz/openbmc_project/dump/entry/{}/action/Delete".format(
dump_id)
payload = {"data": []}
r = self.conf.util_bmc_server.post(
uri=uri, json=payload, minutes=minutes)
return r
[docs] def delete_all_dumps(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Delete all Dumps
'''
ids = self.get_dump_ids()
for id in ids:
self.delete_dump(id, minutes=BMC_CONST.HTTP_RETRY)
log.debug("Deleting Dump ID={}".format(id))
[docs] def create_new_dump(self, minutes=None):
'''
Create new Dump
POST
https://bmcip/xyz/openbmc_project/dump/action/CreateDump
"data" : []
'''
uri = "/xyz/openbmc_project/dump/action/CreateDump"
payload = {"data": []}
try:
r = self.conf.util_bmc_server.post(
uri=uri, json=payload, minutes=minutes)
log.info("OpenBMC Dump capture was successful")
return r.json().get('data')
except Exception as e:
log.debug("Create Dump Exception={}".format(e))
log.info(
"Dumps are exceeded in the system, trying to delete existing ones")
self.delete_all_dumps()
r = self.conf.util_bmc_server.post(
uri=uri, json=payload, minutes=minutes)
log.info("RETRY ATTEMPT OpenBMC Dump capture was successful")
return r.json().get('data')
[docs] def wait_for_dump_finish(self, dump_id, counter=30):
'''
Wait for Dump to Finish
'''
for i in range(counter):
ids = self.get_dump_ids()
if str(dump_id) in ids:
log.debug(
"Dump ID={} is ready to download/offload".format(dump_id))
return True
time.sleep(5)
else:
return False
[docs] def software_enumerate(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Software Enumerate
GET
https://bmcip/xyz/openbmc_project/software
'''
uri = "/xyz/openbmc_project/software"
r = self.conf.util_bmc_server.get(uri=uri, minutes=minutes)
return r
[docs] def has_field_mode_set(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Field Mode Enabled : 0=disabled 1=enabled
'''
r = self.software_enumerate(minutes=minutes)
try:
if int(r.json().get('data').get('FieldModeEnabled')) == 1:
log.debug("has_field_mode_set=1")
return True
else:
log.debug("has_field_mode_set NOT True")
except Exception as e:
log.debug(
"Unable to get FieldModeEnabled so returning False, Exception={}".format(e))
return False
[docs] def set_field_mode(self, mode, minutes=BMC_CONST.HTTP_RETRY):
'''
Set field mode : 0=disable 1=enable
PUT
https://bmcip/xyz/openbmc_project/software/attr/FieldModeEnabled
"data": 0
'''
uri = "/xyz/openbmc_project/software/attr/FieldModeEnabled"
payload = {"data": int(mode)}
r = self.conf.util_bmc_server.put(
uri=uri, json=payload, minutes=minutes)
log.debug("set_field_mode={}".format(r))
[docs] def validate_functional_bootside(self, id, minutes=BMC_CONST.HTTP_RETRY):
'''
Functional Boot Side Validation for both BMC and PNOR
GET
https://bmcip/xyz/openbmc_project/software/functional
{
"data": {
"endpoints": [
"/xyz/openbmc_project/software/061c4bdb",
"/xyz/openbmc_project/software/608e9ebe"
]
}
'''
uri = "/xyz/openbmc_project/software/functional"
r = self.conf.util_bmc_server.get(uri=uri, minutes=minutes)
log.debug("Functional Boot Side Validation: r.text={}".format(r.text))
if id in r.text:
return True
return False
[docs] def is_image_already_active(self, id, minutes=BMC_CONST.HTTP_RETRY):
'''
Check if Image is Active
'''
json_data = self.image_data(id, minutes=minutes)
if json_data.get('data').get('Activation') \
== "xyz.openbmc_project.Software.Activation.Activations.Active":
return True
return False
[docs] def get_occ_ids(self, minutes=BMC_CONST.HTTP_RETRY):
'''
GET
https://bmcip/org/open_power/control/enumerate
'''
uri = "/org/open_power/control/enumerate"
r = self.conf.util_bmc_server.get(uri=uri, minutes=minutes)
occ_ids = []
for k in r.json().get('data'):
log.debug("k={}".format(k))
m = re.match(r"/org/open_power/control/occ(\d{1,})$", k)
if m:
occ_ids.append(m.group(1))
log.debug("OCC IDs: {}".format(occ_ids))
return occ_ids
[docs] def is_occ_active(self, id, minutes=BMC_CONST.HTTP_RETRY):
'''
Get state of OCC's
GET
https://bmcip/org/open_power/control/occ0
'''
uri = "/org/open_power/control/occ{}".format(id)
r = self.conf.util_bmc_server.get(uri=uri, minutes=minutes)
if r.json().get('data').get('OccActive') == 1:
log.debug("# OCC{} is active".format(id))
return True
log.debug("# OCC{} is not active".format(id))
return False
[docs] def enable_power_cap(self, enable, minutes=BMC_CONST.HTTP_RETRY):
'''
Enable Power Cap : 0=disabled by default 1=enabled
PUT
https://bmcip/xyz/openbmc_project/control/host0/power_cap/attr/PowerCapEnable
"data" : 0
'''
uri = "/xyz/openbmc_project/control/host0/power_cap/attr/PowerCapEnable"
payload = {"data": int(enable)}
r = self.conf.util_bmc_server.put(
uri=uri, json=payload, minutes=minutes)
[docs] def power_cap_enable(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Enables the Power Cap
'''
self.enable_power_cap("1")
[docs] def power_cap_disable(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Disables the Power Cap
'''
self.enable_power_cap("0")
[docs] def get_power_cap_settings(self, minutes=BMC_CONST.HTTP_RETRY):
'''
GET
https://bmcip/xyz/openbmc_project/control/host0/power_cap
'''
uri = "/xyz/openbmc_project/control/host0/power_cap"
r = self.conf.util_bmc_server.get(uri=uri, minutes=minutes)
PowerCapEnable = r.json().get('data').get('PowerCapEnable')
PowerCap = r.json().get('data').get('PowerCap')
return PowerCapEnable, PowerCap
[docs] def clear_gard_records(self, minutes=BMC_CONST.HTTP_RETRY):
'''
POST
https://bmcip/org/open_power/control/gard/action/Reset
'''
uri = "/org/open_power/control/gard/action/Reset"
payload = {"data": []}
r = self.conf.util_bmc_server.post(
uri=uri, json=payload, minutes=minutes)
[docs] def factory_reset_software(self, minutes=BMC_CONST.HTTP_RETRY):
'''
POST
https://bmcip/xyz/openbmc_project/software/action/Reset
"data": []"
'''
uri = "/xyz/openbmc_project/software/action/Reset"
payload = {"data": []}
r = self.conf.util_bmc_server.post(
uri=uri, json=payload, minutes=minutes)
[docs] def factory_reset_network(self, minutes=BMC_CONST.HTTP_RETRY):
'''
POST
https://bmcip/xyz/openbmc_project/network/action/Reset
"data": []"
'''
uri = "/xyz/openbmc_project/network/action/Reset"
payload = {"data": []}
r = self.conf.util_bmc_server.post(
uri=uri, json=payload, minutes=minutes)
[docs] def update_root_password(self, password, minutes=BMC_CONST.HTTP_RETRY):
'''
Update Root Password
POST
https://bmcip/xyz/openbmc_project/user/root/action/SetPassword
"data" : ["abc123"]
'''
uri = "/xyz/openbmc_project/user/root/action/SetPassword"
payload = {"data": [str(password)]}
r = self.conf.util_bmc_server.post(
uri=uri, json=payload, minutes=minutes)
[docs] def is_tpm_enabled(self, minutes=BMC_CONST.HTTP_RETRY):
'''
TPM Enablement Check 0=TPMEnable cleared 1=TPMEnable set
GET
https://bmcip/xyz/openbmc_project/control/host0/TPMEnable
'''
uri = "/xyz/openbmc_project/control/host0/TPMEnable"
r = self.conf.util_bmc_server.get(uri=uri, minutes=minutes)
if r.json().get('data').get('TPMEnable') == 1:
log.debug("# TPMEnable is set")
return True
log.debug("# TPMEnable is cleared")
return False
[docs] def enable_tpm(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Enable TPM
'''
self.configure_tpm_enable(1)
[docs] def disable_tpm(self, minutes=BMC_CONST.HTTP_RETRY):
'''
Disable TPM
'''
self.configure_tpm_enable(0)
[docs]class OpTestOpenBMC():
def __init__(self, ip=None, username=None, password=None, ipmi=None,
rest_api=None, logfile=sys.stdout,
check_ssh_keys=False, known_hosts_file=None):
self.hostname = ip
self.username = username
self.password = password
self.ipmi = ipmi
self.rest_api = rest_api
self.has_vpnor = None
self.logfile = logfile
self.console = OpTestSSH(ip, username, password, port=2200,
logfile=self.logfile, check_ssh_keys=check_ssh_keys,
known_hosts_file=known_hosts_file)
self.bmc = OpTestBMC(ip=self.hostname,
username=self.username,
password=self.password,
logfile=self.logfile,
check_ssh_keys=check_ssh_keys,
known_hosts_file=known_hosts_file)
[docs] def set_system(self, system):
self.console.set_system(system)
self.bmc.set_system(system)
[docs] def query_vpnor(self, minutes=BMC_CONST.HTTP_RETRY):
if self.has_vpnor is not None:
return self.has_vpnor
# As of June 2020, we came to the determination that:
# * Both VPNOR and non-VPNOR OpenBMCs can answer with an inventory of Host
# images (using Redfish APIs)
# * Both VPNOR and non-VPNOR OpenBMCs can have that above list empty
# * So the only reliable way of checking for VPNOR capability is to check
# for 'pflash' presence in the BMC, since OpenBMC should be bundling it
# ONLY on systems that are NOT capable of VPNOR:
self.has_vpnor = not self.bmc.validate_pflash_tool()
log.debug("System is " + ("" if self.has_vpnor else "NOT") + " VPNOR-capable")
return self.has_vpnor
[docs] def reboot(self):
self.bmc.reboot()
# After a BMC reboot, wait for it to reach ready state
self.rest_api.wait_for_bmc_runtime()
[docs] def reboot_nowait(self):
# Reboot BMC but do not wait for it to come back
self.bmc.reboot_nowait()
[docs] def image_transfer(self, i_imageName, copy_as=None):
self.bmc.image_transfer(i_imageName, copy_as=copy_as)
[docs] def pnor_img_flash_openbmc(self, pnor_name):
self.bmc.pnor_img_flash_openbmc(pnor_name)
[docs] def skiboot_img_flash_openbmc(self, lid_name):
if not self.query_vpnor():
log.debug("We do NOT have vpnor, so calling old pflash")
self.bmc.skiboot_img_flash_openbmc(lid_name)
else:
# don't ask. There appears to be a bug where we need to be 4k aligned.
self.bmc.run_command(
"dd if=/dev/zero of=/dev/stdout bs=1M count=1 | tr '\\000' '\\377' > /tmp/ones")
self.bmc.run_command(
"cat /tmp/%s /tmp/ones > /tmp/padded" % lid_name)
self.bmc.run_command(
"dd if=/tmp/padded of=/usr/local/share/pnor/PAYLOAD bs=1M count=1")
#self.bmc.run_command("mv /tmp/%s /usr/local/share/pnor/PAYLOAD" % lid_name, timeout=60)
[docs] def skiroot_img_flash_openbmc(self, lid_name):
if not self.query_vpnor():
self.bmc.skiroot_img_flash_openbmc(lid_name)
else:
# don't ask. There appears to be a bug where we need to be 4k aligned.
#self.bmc.run_command("dd if=/dev/zero of=/dev/stdout bs=16M count=1 | tr '\\000' '\\377' > /tmp/ones")
#self.bmc.run_command("cat /tmp/%s /tmp/ones > /tmp/padded" % lid_name)
try:
# seems the success on following command gives echo $?=1
#self.bmc.run_command("dd if=/tmp/padded of=/usr/local/share/pnor/BOOTKERNEL bs=16M count=1")
self.bmc.run_command("rm -f /usr/local/share/pnor/BOOTKERNEL", timeout=60)
self.bmc.run_command("ln -s /tmp/{} /usr/local/share/pnor/BOOTKERNEL".format(lid_name), timeout=60)
except CommandFailed as e:
log.warning("FLASHING CommandFailed, check that this is ok for your setup")
[docs] def flash_part_openbmc(self, lid_name, part_name):
if not self.query_vpnor():
self.bmc.flash_part_openbmc(lid_name, part_name)
else:
self.bmc.run_command(
"mv /tmp/%s /usr/local/share/pnor/%s" % (lid_name, part_name), timeout=60)
[docs] def clear_field_mode(self):
self.bmc.run_command("fw_setenv fieldmode")
self.bmc.run_command("systemctl unmask usr-local.mount")
log.info("clear_field_mode rebooting the BMC")
self.reboot()
[docs] def bmc_host(self):
return self.hostname
[docs] def get_ipmi(self):
return self.ipmi
[docs] def get_host_console(self):
return self.console
[docs] def get_rest_api(self):
return self.rest_api
[docs] def run_command(self, command, timeout=10, retry=0):
return self.bmc.run_command(command, timeout, retry)
[docs] def has_inband_bootdev(self):
return False
[docs] def has_os_boot_sensor(self):
return False
[docs] def has_host_status_sensor(self):
return False
[docs] def has_occ_active_sensor(self):
return False
[docs] def has_ipmi_sel(self):
return False
[docs] def supports_ipmi_dcmi(self):
return False