# thoth-lab
# Copyright(C) 2020 Francesco Murdaca
#
# This program is free software: you can redistribute it and / or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Adviser results processing and analysis."""
import logging
import os
import sys
import hashlib
import copy
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import plotly
import plotly.graph_objs as go
from plotly.offline import iplot
from pathlib import Path
from datetime import datetime
from typing import List, Dict, Any, Optional
from numpy import array
from sklearn.preprocessing import LabelEncoder
from thoth.storages import AdvisersResultsStore
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
_LOGGER = logging.getLogger("thoth.lab.adviser")
[docs]def aggregate_adviser_results(adviser_version: str, limit_results: bool = False, max_ids: int = 5) -> pd.DataFrame:
"""Aggregate adviser results from jsons stored in Ceph.
:param adviser_version: minimum adviser version considered for the analysis of adviser runs
:param limit_results: reduce the number of adviser runs ids considered to `max_ids` to test analysis
:param max_ids: maximum number of adviser runs ids considered
"""
adviser_store = AdvisersResultsStore()
adviser_store.connect()
adviser_ids = list(adviser_store.get_document_listing())
_LOGGER.info("Number of Adviser reports identified is: %r" % len(adviser_ids))
adviser_dict = {}
number_adviser_results = len(adviser_ids)
current_a_counter = 1
if limit_results:
_LOGGER.debug(f"Limiting results to {max_ids} to test functions!!")
for n, ids in enumerate(adviser_ids):
try:
document = adviser_store.retrieve_document(ids)
datetime_advise_run = document["metadata"].get("datetime")
analyzer_version = document["metadata"].get("analyzer_version")
_LOGGER.debug(f"Analysis n.{current_a_counter}/{number_adviser_results}")
result = document["result"]
_LOGGER.debug(ids)
if int("".join(analyzer_version.split("."))) >= int("".join(adviser_version.split("."))):
report = result.get("report")
error = result["error"]
if error:
error_msg = result["error_msg"]
adviser_dict[ids] = {
"justification": [{"message": error_msg, "type": "ERROR"}],
"error": error,
"message": error_msg,
"type": "ERROR",
}
else:
adviser_dict = extract_adviser_justifications(report=report, adviser_dict=adviser_dict, ids=ids)
if ids in adviser_dict.keys():
adviser_dict[ids]["datetime"] = datetime.strptime(datetime_advise_run, "%Y-%m-%dT%H:%M:%S.%f")
adviser_dict[ids]["analyzer_version"] = analyzer_version
current_a_counter += 1
if limit_results:
if current_a_counter > max_ids:
return _create_adviser_dataframe(adviser_dict)
except Exception as e:
_LOGGER.warning(e)
return _create_adviser_dataframe(adviser_dict)
def _create_adviser_dataframe(adviser_data: dict):
"""Create dataframe of adviser results from data collected."""
adviser_df = pd.DataFrame(
adviser_data, index=["datetime", "analyzer_version", "error", "justification", "message", "type"]
)
adviser_df = adviser_df.transpose()
adviser_df["date"] = pd.to_datetime(adviser_df["datetime"])
return adviser_df
[docs]def create_final_dataframe(adviser_dataframe: pd.DataFrame) -> pd.DataFrame:
"""Create final dataframe with all information required for plots.
:param adviser_dataframe: data frame as returned by `aggregate_adviser_results` method.
"""
jm_encoding = []
for index, row in adviser_dataframe[["message"]].iterrows():
hash_object = hashlib.sha256(bytes(row.values[0], "raw_unicode_escape"))
hex_dig = hash_object.hexdigest()
jm_encoding.append([index, row.values, hex_dig])
label_encoder = LabelEncoder()
justification_result = copy.deepcopy(adviser_dataframe.to_dict())
jm_hash_id_values = array([pp[2] for pp in jm_encoding])
integer_jm_hash_id_values_encoded = label_encoder.fit_transform(jm_hash_id_values)
counter = 0
for id_jm in integer_jm_hash_id_values_encoded:
jm_encoding[counter] = jm_encoding[counter] + [id_jm]
counter += 1
justification_result["jm_hash_id_encoded"] = {el[0]: el[3] for el in jm_encoding}
total_dataframe = pd.DataFrame(justification_result)
info_dataframe = total_dataframe[total_dataframe["type"] == "INFO"]
error_dataframe = total_dataframe[total_dataframe["type"] == "ERROR"]
return total_dataframe, info_dataframe, error_dataframe
[docs]def create_adviser_results_histogram(plot_df: pd.DataFrame):
"""Create inspection performance parameters plot in 3D.
:param plot_df dataframe for plot of adviser results
"""
plotly.offline.init_notebook_mode(connected=True)
histogram_data = {}
for index, row in plot_df[["jm_hash_id_encoded", "message", "type"]].iterrows():
encoded_id = row["jm_hash_id_encoded"]
if encoded_id not in histogram_data.keys():
histogram_data[encoded_id] = {
"jm_hash_id_encoded": f"type-{encoded_id}",
"message": row["message"],
"type": row["type"],
"count": plot_df["jm_hash_id_encoded"].value_counts()[encoded_id],
}
justifications_df = pd.DataFrame(histogram_data)
justifications_df = justifications_df.transpose()
justifications_df = justifications_df.sort_values(by="count", ascending=False)
X = justifications_df["jm_hash_id_encoded"] # noqa N806
Y = justifications_df["count"] # noqa N806
trace1 = go.Bar(
x=X,
y=Y,
name="Adviser==0.7.3 justifications",
hovertext=[y[0] for y in justifications_df[["message"]].values],
hoverinfo="text",
marker=dict(
color=justifications_df["count"], colorscale="Viridis", opacity=0.8, showscale=True # choose a colorscale
),
)
data = [trace1]
margin = {"l": 0, "r": 0, "b": 0, "t": 0}
layout = go.Layout(
title="Adviser justifications",
margin=margin,
scene=dict(xaxis=dict(title="Justification encoded ID"), yaxis=dict(title="Counter")),
showlegend=True,
legend=dict(orientation="h"),
)
fig = go.Figure(data=data, layout=layout)
iplot(fig, filename="bar-plot")
return justifications_df
def _aggregate_data_per_interval(adviser_justification_df: pd.DataFrame):
"""Aggregate advise justifications per weekly time intervals."""
begin = min(adviser_justification_df["date"].values)
end = max(adviser_justification_df["date"].values)
timestamps = []
delta = np.timedelta64(7, "D")
intervals = (end - begin) / delta
value = begin
for i in range(1, int(intervals) + 1):
value = value + delta
timestamps.append(value)
timestamps[0] = begin
timestamps[len(timestamps) - 1] = end
aggregated_data = {}
for tm_ in range(0, len(timestamps)):
low = timestamps[tm_ - 1]
high = timestamps[tm_]
aggregated_data[high] = {}
subset_df = adviser_justification_df[
(adviser_justification_df["date"] >= low) & (adviser_justification_df["date"] <= high)
]
for index, row in subset_df[["jm_hash_id_encoded", "message", "date"]].iterrows():
encoded_id = row["jm_hash_id_encoded"]
if encoded_id not in aggregated_data[high].keys():
aggregated_data[high][encoded_id] = {
"jm_hash_id_encoded": f"type-{encoded_id}",
"message": row["message"],
"count": subset_df["jm_hash_id_encoded"].value_counts()[encoded_id],
}
return aggregated_data
def _create_heatmaps_values(input_data: dict, advise_encoded_type: List[int]):
"""Create values for heatmaps."""
heatmaps_values = {}
for advise_type in set(advise_encoded_type):
_LOGGER.debug(f"Analyzing advise type... {advise_type}")
type_values = []
for upper_interval, interval_runs in input_data.items():
_LOGGER.debug(f"Checking for that advise type in 'interval'... {upper_interval}")
if advise_type in interval_runs.keys():
type_values.append(interval_runs[advise_type]["count"])
else:
type_values.append(0)
heatmaps_values[advise_type] = type_values
return heatmaps_values
[docs]def create_adviser_heatmap(
adviser_justification_df: pd.DataFrame,
file_name: Optional[str] = None,
save_result: bool = False,
output_dir: Optional[str] = None,
):
"""Create adviser justifications heatmap plot.
:param adviser_justification_df: data frame as returned by `create_final_dataframe' per identifier.
:param file_name: file name used in the name of files saved
:param save_result: resulting plots created are stored in `output_dir`.
:param output_dir: output directory where plots are stored if `save_results` is set to True.
"""
data = _aggregate_data_per_interval(adviser_justification_df=adviser_justification_df)
heatmaps_values = _create_heatmaps_values(
input_data=data, advise_encoded_type=adviser_justification_df["jm_hash_id_encoded"].values
)
df_heatmap = pd.DataFrame(heatmaps_values)
df_heatmap["interval"] = data.keys()
df_heatmap = df_heatmap.set_index(["interval"])
df_heatmap = df_heatmap.transpose()
adviser_justifications_map = {}
for index, row in adviser_justification_df[["jm_hash_id_encoded", "message"]].iterrows():
if row["jm_hash_id_encoded"] not in adviser_justifications_map.keys():
adviser_justifications_map[row["jm_hash_id_encoded"]] = row["message"]
justifications_ordered = []
for index, row in df_heatmap.iterrows():
justifications_ordered.append(adviser_justifications_map[index])
df_heatmap["advise_type"] = justifications_ordered
df_heatmap = df_heatmap.set_index(["advise_type"])
plt.subplots(figsize=(15, 15))
ax = sns.heatmap(df_heatmap, annot=True, fmt="g")
plt.show()
if save_result:
if output_dir:
current_path = Path.cwd()
project_dir_path = current_path.joinpath(output_dir)
os.makedirs(project_dir_path, exist_ok=True)
if not file_name:
file_name = ""
fig = ax.get_figure()
fig.savefig(
f"{project_dir_path}/Adviser_justifications_{file_name}_{datetime.utcnow()}.png", bbox_inches="tight"
)
plt.close()
return df_heatmap