# thoth-lab
# Copyright(C) 2020 Francesco Murdaca
#
# This program is free software: you can redistribute it and / or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Security results processing and analysis."""
import logging
from pathlib import Path
from typing import List, Optional, Tuple, Dict
import pandas as pd
from plotly import graph_objs as go
from plotly.offline import iplot
from thoth.python import Source
from .common import aggregate_thoth_results
_LOGGER = logging.getLogger("thoth.lab.security")
logging.basicConfig(level=logging.INFO)
[docs]class SecurityIndicators:
    """Class of methods used to analyze Security Indicators (SI)."""
    # SI-bandit
[docs]    @staticmethod
    def aggregate_security_indicator_bandit_results(
        limit_results: bool = False,
        max_ids: int = 5,
        is_local: bool = True,
        security_indicator_bandit_repo_path: Path = Path("security/si-bandit"),
    ) -> list:
        """Aggregate si_bandit results from jsons stored in Ceph or locally from `si_bandit` repo.
        :param limit_results: reduce the number of si_bandit reports ids considered to `max_ids` to test analysis
        :param max_ids: maximum number of si_bandit reports ids considered
        :param is_local: flag to retreive the dataset locally or from S3 (credentials are required)
        :param si_bandit_repo_path: path to retrieve the si_bandit dataset locally and `is_local` is set to True
        """
        security_indicator_bandit_reports = aggregate_thoth_results(
            limit_results=limit_results,
            max_ids=max_ids,
            is_local=is_local,
            repo_path=security_indicator_bandit_repo_path,
            store_name="si-bandit",
        )
        return security_indicator_bandit_reports 
[docs]    def create_security_confidence_dataframe(
        self, si_bandit_report: dict, filters_files: Optional[List[str]] = None
    ) -> Tuple[pd.DataFrame, Dict[str, int]]:
        """Create Security/Confidence dataframe for si-bandit report."""
        results_sec_conf, summary_files = self.extract_severity_confidence_info(
            si_bandit_report=si_bandit_report, filters_files=filters_files
        )
        summary_df = pd.DataFrame()
        if results_sec_conf:
            summary_df = pd.json_normalize(results_sec_conf, sep="__").set_index("name")
        else:
            summary_df = pd.json_normalize(results_sec_conf, sep="__")
        summary_df["_total_severity"] = summary_df.sum(axis=1)
        sec_conf_df = summary_df.transpose()
        sec_conf_df["_total"] = sec_conf_df.sum(axis=1)
        return sec_conf_df, summary_files 
[docs]    @staticmethod
    def produce_si_bandit_report_summary_dataframe(
        metadata_df: pd.DataFrame, si_bandit_sec_conf_df: pd.DataFrame, summary_files: Dict[str, int]
    ) -> pd.DataFrame:
        """Create si-bandit report summary dataframe."""
        subset_df = pd.DataFrame([si_bandit_sec_conf_df["_total"].to_dict()])
        report_summary_df = pd.concat([metadata_df, subset_df], axis=1)
        report_summary_df["number_of_files_with_severities"] = pd.to_numeric(
            summary_files["number_of_files_with_severities"]
        )
        report_summary_df["number_of_analyzed_files"] = pd.to_numeric(summary_files["number_of_analyzed_files"])
        report_summary_df["number_of_filtered_files"] = pd.to_numeric(summary_files["number_of_filtered_files"])
        report_summary_df["number_of_files_total"] = pd.to_numeric(
            summary_files["number_of_filtered_files"]
        ) + pd.to_numeric(summary_files["number_of_analyzed_files"])
        report_summary_df["_total_severity"] = pd.to_numeric(report_summary_df["_total_severity"])
        return report_summary_df 
[docs]    @staticmethod
    def add_release_date(metadata_df: pd.DataFrame) -> pd.DataFrame:
        """Add release date to metadata."""
        package_name = metadata_df["package_name"][0]
        package_version = metadata_df["package_version"][0]
        package_index = metadata_df["package_index"][0]
        _LOGGER.debug("consider index %r", package_index)
        source = Source(package_index)
        _LOGGER.debug("Obtaining %r versions", package_name)
        release_date = source.get_package_release_date(package_name=package_name, package_version=package_version)
        metadata_df["release_date"] = pd.Timestamp(release_date, unit="s")
        return metadata_df 
[docs]    def create_si_bandit_final_dataframe(
        self,
        si_bandit_reports: List[dict],
        use_external_source_data: bool = False,
        filters_files: Optional[List[str]] = None,
    ) -> pd.DataFrame:
        """Create final si-bandit dataframe."""
        counter = 1
        final_df = pd.DataFrame()
        total_reports = len(si_bandit_reports)
        for si_bandit_report in si_bandit_reports:
            _LOGGER.info(f"Analyzing SI-bandit report: {counter}/{total_reports}")
            # Create metadata dataframe
            metadata_df = self.create_si_bandit_metadata_dataframe(si_bandit_report=si_bandit_report)
            _LOGGER.info(f"Analyzing package_name: {metadata_df['package_name'][0]}")
            _LOGGER.info(f"Analyzing package_version: {metadata_df['package_version'][0]}")
            _LOGGER.info(f"Analyzing package_index: {metadata_df['package_index'][0]}")
            if use_external_source_data:
                try:
                    metadata_df = self.add_release_date(metadata_df=metadata_df)
                except Exception as e:
                    _LOGGER.warning(e)
                    pass
            # Create Security/Confidence dataframe
            security_confidence_df, summary_files = self.create_security_confidence_dataframe(
                si_bandit_report=si_bandit_report, filters_files=filters_files
            )
            si_bandit_report_summary_df = self.produce_si_bandit_report_summary_dataframe(
                metadata_df=metadata_df, si_bandit_sec_conf_df=security_confidence_df, summary_files=summary_files
            )
            final_df = pd.concat([final_df, si_bandit_report_summary_df], axis=0)
            counter += 1
        return final_df 
[docs]    @staticmethod
    def create_package_releases_vulnerabilities_trend(
        si_bandit_df: pd.DataFrame,
        package_name: str,
        package_index: str,
        security_infos: Optional[List[str]] = None,
        show_vulnerability_data: bool = False,
    ):
        """Plot vulnerabilites trend for a Python package from a certain index.
        :param si_bandit_df: pandas dataframe given by 'create_si_bandit_final_dataframe' method
        with `use_external_source_data` set to True.
        :param package_name: Python Package name filter
        :param package_index: Python Package index filter
        :param security_infos: list of info to be visualized in the plot
        :param show_vulnerability_data: show all data regarding vulnerabilites if set to True
        """
        package_summary_df = si_bandit_df[
            (si_bandit_df["package_name"] == package_name) & (si_bandit_df["package_index"] == package_index)
        ]
        package_summary_df = package_summary_df.sort_values(by=["release_date"], ascending=True)
        X = package_summary_df["package_version"]  # noqa N806
        data = []
        if show_vulnerability_data:
            vulnerabilites_classes = [col for col in package_summary_df if col.startswith("SEVERITY.")]
            for vulnerability_class in vulnerabilites_classes:
                subset_df = package_summary_df[[vulnerability_class]]
                if subset_df.values.any():
                    Z = [z[0] for z in subset_df.values]  # noqa N806
                    trace = go.Scatter(
                        x=X, y=Z, mode="markers+lines", marker=dict(size=4, opacity=0.8), name=f"{vulnerability_class}"
                    )
                    data.append(trace)
        if not security_infos:
            security_infos = ["_total_severity"]
        for security_info in security_infos:
            subset_df = package_summary_df[[security_info]]
            Z = [z[0] for z in subset_df.values]  # noqa N806
            trace = go.Scatter(
                x=X, y=Z, mode="markers+lines", marker=dict(size=4, opacity=0.8), name=f"{security_info}"
            )
            data.append(trace)
        layout = go.Layout(
            title=f"SI analysis for {package_name} from {package_index} using SI-bandit",
            xaxis=dict(title="Releases"),
            yaxis=dict(title="Security scores and sub_scores"),
            showlegend=True,
            legend=dict(orientation="h", y=-0.7, yanchor="top"),
        )
        fig = go.Figure(data=data, layout=layout)
        iplot(fig, filename="scatter-colorscale") 
[docs]    @staticmethod
    def create_vulnerabilities_plot(
        security_df: pd.DataFrame, security_infos: Optional[List[str]] = None, show_vulnerability_data: bool = False
    ) -> None:
        """Plot vulnerabilites trend for a Python package from a certain index.
        :param security_df: pandas dataframe given by 'create_si_bandit_final_dataframe' method
        with `use_external_source_data` set to True.
        :param security_infos: list of info to be visualized in the plot
        :param show_vulnerability_data: show all data regarding vulnerabilites if set to True
        """
        if not security_infos:
            security_infos = ["_total_severity"]
        packages = []
        vulnerabilites = {}
        for column in security_infos:
            vulnerabilites[column] = []
        for index, row in security_df[["package_name", "package_version", "package_index"] + security_infos].iterrows():
            package_name = row["package_name"]
            package_version = row["package_version"]
            package_index = row["package_index"]
            packages.append(f"{package_name}-{package_version}-{package_index}")
            for column in security_infos:
                vulnerabilites[column].append(row[column])
        data = []
        for vulnerability_class in vulnerabilites:
            trace = go.Scatter(
                x=packages,
                y=vulnerabilites[vulnerability_class],
                mode="markers",
                marker=dict(size=4, opacity=0.8),
                name=f"{vulnerability_class}",
            )
            data.append(trace)
        layout = go.Layout(
            title="SI analysis for Python packages using SI-bandit",
            xaxis=dict(title="{package_name-package_version-package_index}"),
            showlegend=True,
        )
        fig = go.Figure(data=data, layout=layout)
        iplot(fig, filename="scatter-colorscale") 
[docs]    @staticmethod
    def define_si_scores(si_bandit_df: pd.DataFrame) -> pd.DataFrame():
        """Define security scores from si bandit outputs.
        WARNING: It depends on all data considered.
        """
        HIGH_CONFIDENCE_WEIGHT = 1  # noqa N806
        MEDIUM_CONFIDENCE_WEIGHT = 0.5  # noqa N806
        LOW_CONFIDENCE_WEIGHT = 0.1  # noqa N806
        for security in ["LOW", "MEDIUM", "HIGH"]:
            for confidence in ["LOW", "MEDIUM", "HIGH"]:
                q = f"SEVERITY.{security}__CONFIDENCE.{confidence}"
                min_max_scaler = (si_bandit_df[q] - si_bandit_df[q].min()) / (
                    si_bandit_df[q].max() - si_bandit_df[q].min()
                )
                si_bandit_df[f"{q}_scaled"] = min_max_scaler
        si_bandit_df["SEVERITY.HIGH.sub_score"] = (
            si_bandit_df["SEVERITY.HIGH__CONFIDENCE.HIGH"].fillna(0) * HIGH_CONFIDENCE_WEIGHT
            + si_bandit_df["SEVERITY.HIGH__CONFIDENCE.MEDIUM"].fillna(0) * MEDIUM_CONFIDENCE_WEIGHT
            + si_bandit_df["SEVERITY.HIGH__CONFIDENCE.LOW"].fillna(0) * LOW_CONFIDENCE_WEIGHT
        ) / 3
        si_bandit_df["SEVERITY.MEDIUM.sub_score"] = (
            si_bandit_df["SEVERITY.MEDIUM__CONFIDENCE.HIGH_scaled"].fillna(0) * HIGH_CONFIDENCE_WEIGHT
            + si_bandit_df["SEVERITY.MEDIUM__CONFIDENCE.MEDIUM_scaled"].fillna(0) * MEDIUM_CONFIDENCE_WEIGHT
            + si_bandit_df["SEVERITY.MEDIUM__CONFIDENCE.LOW_scaled"].fillna(0) * LOW_CONFIDENCE_WEIGHT
        ) / 3
        si_bandit_df["SEVERITY.LOW.sub_score"] = (
            si_bandit_df["SEVERITY.LOW__CONFIDENCE.HIGH_scaled"].fillna(0) * HIGH_CONFIDENCE_WEIGHT
            + si_bandit_df["SEVERITY.LOW__CONFIDENCE.MEDIUM_scaled"].fillna(0) * MEDIUM_CONFIDENCE_WEIGHT
            + si_bandit_df["SEVERITY.LOW__CONFIDENCE.LOW_scaled"].fillna(0) * LOW_CONFIDENCE_WEIGHT
        ) / 3
        HIGH_SEVERITY_WEIGHT = 100  # noqa N806
        MEDIUM_SEVERITY_WEIGHT = 10  # noqa N806
        LOW_SEVERITY_WEIGHT = 1  # noqa N806
        si_bandit_df["SEVERITY.score"] = (
            si_bandit_df["SEVERITY.HIGH.sub_score"] * HIGH_SEVERITY_WEIGHT
            + si_bandit_df["SEVERITY.MEDIUM.sub_score"] * MEDIUM_SEVERITY_WEIGHT
            + si_bandit_df["SEVERITY.LOW.sub_score"] * LOW_SEVERITY_WEIGHT
        ) / 3
        si_bandit_df["SEVERITY.score.normalized"] = (
            si_bandit_df["SEVERITY.score"] / si_bandit_df["number_of_analyzed_files"].max()
        )
        return si_bandit_df 
    # SI-cloc
[docs]    @staticmethod
    def aggregate_security_indicator_cloc_results(
        limit_results: bool = False,
        max_ids: int = 5,
        is_local: bool = True,
        security_indicator_cloc_repo_path: Path = Path("security/si-cloc"),
    ) -> list:
        """Aggregate si_cloc results from jsons stored in Ceph or locally from `si_cloc` repo.
        :param limit_results: reduce the number of si_cloc reports ids considered to `max_ids` to test analysis
        :param max_ids: maximum number of si_cloc reports ids considered
        :param is_local: flag to retreive the dataset locally or from S3 (credentials are required)
        :param si_cloc_repo_path: path to retrieve the si_cloc dataset locally and `is_local` is set to True
        """
        security_indicator_cloc_reports = aggregate_thoth_results(
            limit_results=limit_results,
            max_ids=max_ids,
            is_local=is_local,
            repo_path=security_indicator_cloc_repo_path,
            store_name="si-cloc",
        )
        return security_indicator_cloc_reports 
[docs]    def create_si_cloc_results_dataframe(self, si_cloc_report: dict) -> pd.DataFrame:
        """Create si-cloc report results dataframe."""
        results = {k: v for k, v in si_cloc_report["result"].items() if k != "header"}
        results["SUM"]["n_lines"] = si_cloc_report["result"]["header"]["n_lines"]
        results_df = pd.json_normalize(results)
        return results_df 
[docs]    @staticmethod
    def produce_si_cloc_report_summary_dataframe(
        metadata_df: pd.DataFrame, cloc_results_df: pd.DataFrame
    ) -> pd.DataFrame:
        """Create si-cloc report summary dataframe."""
        report_summary_df = pd.concat([metadata_df, cloc_results_df], axis=1)
        return report_summary_df 
[docs]    def create_si_cloc_final_dataframe(self, si_cloc_reports: list) -> pd.DataFrame:
        """Create final si-cloc dataframe."""
        counter = 1
        total_reports = len(si_cloc_reports)
        final_df = pd.DataFrame()
        for si_cloc_report in si_cloc_reports:
            _LOGGER.info(f"Analyzing SI-cloc report: {counter}/{total_reports}")
            # Create metadata dataframe
            metadata_df = self.create_si_cloc_metadata_dataframe(si_cloc_report)
            _LOGGER.info(f"Analyzing package_name: {metadata_df['package_name'][0]}")
            _LOGGER.info(f"Analyzing package_version: {metadata_df['package_version'][0]}")
            _LOGGER.info(f"Analyzing package_index: {metadata_df['package_index'][0]}")
            # Create Security/Confidence dataframe
            cloc_results_df = self.create_si_cloc_results_dataframe(si_cloc_report=si_cloc_report)
            si_cloc_report_summary_df = self.produce_si_cloc_report_summary_dataframe(
                metadata_df=metadata_df, cloc_results_df=cloc_results_df
            )
            final_df = pd.concat([final_df, si_cloc_report_summary_df], axis=0)
            counter += 1
        return final_df