Tuesday, March 31, 2026

 The following is a sample code for getting custom insights into GitHub issues opened against a repository on a periodic basis:

#! /usr/bin/python

import os, requests, json, datetime, re

REPO = os.environ["REPO"]

TOKEN = os.environ["GH_TOKEN"]

WINDOW_DAYS = int(os.environ.get("WINDOW_DAYS","7"))

HEADERS = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json", "X-GitHub-Api-Version": "2026-03-10"}

since = (datetime.datetime.utcnow() - datetime.timedelta(days=WINDOW_DAYS)).isoformat() + "Z"

# ---- Helpers ----

def gh_get(url, params=None):

  r = requests.get(url, headers=HEADERS, params=params)

  r.raise_for_status()

  return r.json()

def gh_get_text(url):

  r = requests.get(url, headers=HEADERS)

  r.raise_for_status()

  return r.text

issues_url = f"https://api.github.com/repos/{REPO}/issues"

params = {"state":"closed","since":since,"per_page":100}

items = gh_get(issues_url, params=params)

issues = []

for i in items:

  if "pull_request" in i:

    continue

  comments = gh_get(i["comments_url"], params={"per_page":100})

  pr_urls = set()

  for c in comments:

    body = c.get("body","") or ""

    for m in re.findall(r"https://github\.com/[^/\s]+/[^/\s]+/pull/\d+", body):

      pr_urls.add(m)

    for m in re.findall(r"(?:^|\s)#(\d+)\b", body):

      pr_urls.add(f"https://github.com/{REPO}/pull/{m}")

  issues.append({

    "number": i["number"],

    "title": i.get("title",""),

    "user": i.get("user",{}).get("login",""),

    "created_at": i.get("created_at"),

    "closed_at": i.get("closed_at"),

    "html_url": i.get("html_url"),

    "comments": [{"id":c.get("id"), "body":c.get("body",""), "created_at":c.get("created_at")} for c in comments],

    "pr_urls": sorted(pr_urls)

  })

with open("issues.json","w") as f:

  json.dump(issues, f, indent=2)

print(f"WROTE_ISSUES={len(issues)}")

import os, requests, datetime, pandas as pd

REPO = os.environ["REPO"]

TOKEN = os.environ["GH_TOKEN"]

WINDOW_DAYS = int(os.environ.get("WINDOW_DAYS", "7"))

headers = {

  "Authorization": f"Bearer {TOKEN}",

  "Accept": "application/vnd.github+json",

}

since = (datetime.datetime.utcnow() - datetime.timedelta(days=WINDOW_DAYS)).isoformat() + "Z"

url = f"https://api.github.com/repos/{REPO}/issues"

def fetch(state):

  items = []

  page = 1

  while True:

    r = requests.get(

      url,

      headers=headers,

      params={"state": state, "since": since, "per_page": 100, "page": page},

    )

    r.raise_for_status()

    batch = [i for i in r.json() if "pull_request" not in i]

    if not batch:

      break

    items.extend(batch)

    if len(batch) < 100:

      break

    page += 1

  return items

opened = fetch("open")

closed = fetch("closed")

df = pd.DataFrame(

  [

    {"metric": "opened", "count": len(opened)},

    {"metric": "closed", "count": len(closed)},

  ]

)

df.to_csv("issue_activity.csv", index=False)

print(df)

import os, re, json, datetime, requests

import hcl2

import pandas as pd

REPO = os.environ["GITHUB_REPOSITORY"]

GH_TOKEN = os.environ["GH_TOKEN"]

HEADERS = {"Authorization": f"Bearer {GH_TOKEN}", "Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json", "X-GitHub-Api-Version": "2026-03-10"}

# ---- Time window (last 7 days) ----

since = (datetime.datetime.utcnow() - datetime.timedelta(days=7)).isoformat() + "Z"

# ---- Helpers ----

def list_closed_issues():

  # Issues API returns both issues and PRs; filter out PRs.

  url = f"https://api.github.com/repos/{REPO}/issues"

  items = gh_get(url, params={"state":"closed","since":since,"per_page":100})

  return [i for i in items if "pull_request" not in i]

PR_HTML_URL_RE = re.compile(

    r"https?://github\.com/(?P<owner>[^/\s]+)/(?P<repo>[^/\s]+)/pull/(?P<num>\d+)",

    re.IGNORECASE,

)

PR_API_URL_RE = re.compile(

    r"https?://api\.github\.com/repos/(?P<owner>[^/\s]+)/(?P<repo>[^/\s]+)/pulls/(?P<num>\d+)",

    re.IGNORECASE,

)

# Shorthand references that might appear in text:

# - #123 (assumed to be same repo)

# - owner/repo#123 (explicit cross-repo)

SHORTHAND_SAME_REPO_RE = re.compile(r"(?<!\w)#(?P<num>\d+)\b")

SHORTHAND_CROSS_REPO_RE = re.compile(

    r"(?P<owner>[A-Za-z0-9_.-]+)/(?P<repo>[A-Za-z0-9_.-]+)#(?P<num>\d+)\b"

)

def _normalize_html_pr_url(owner: str, repo: str, num: int) -> str:

    return f"https://github.com/{owner}/{repo}/pull/{int(num)}"

def _collect_from_text(text: str, default_owner: str, default_repo: str) -> set:

    """Extract candidate PR URLs from free text (body/comments/events text)."""

    found = set()

    if not text:

        return found

    # 1) Direct HTML PR URLs

    for m in PR_HTML_URL_RE.finditer(text):

        found.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num")))

    # 2) API PR URLs -> convert to HTML

    for m in PR_API_URL_RE.finditer(text):

        found.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num")))

    # 3) Cross-repo shorthand: owner/repo#123 (we will treat it as PR URL candidate)

    for m in SHORTHAND_CROSS_REPO_RE.finditer(text):

        found.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num")))

    # 4) Same-repo shorthand: #123

    for m in SHORTHAND_SAME_REPO_RE.finditer(text):

        found.add(_normalize_html_pr_url(default_owner, default_repo, m.group("num")))

    return found

def _paginate_gh_get(url, headers=None, per_page=100):

    """Generator: fetch all pages until fewer than per_page are returned."""

    page = 1

    while True:

        data = gh_get(url, params={"per_page": per_page, "page": page})

        if not isinstance(data, list) or len(data) == 0:

            break

        for item in data:

            yield item

        if len(data) < per_page:

            break

        page += 1

def extract_pr_urls_from_issue(issue_number: int):

    """

    Extract PR URLs associated with an issue by scanning:

      - Issue body

      - Issue comments

      - Issue events (including 'mentioned', 'cross-referenced', etc.)

      - Issue timeline (most reliable for cross references)

    Returns a sorted list of unique, normalized HTML PR URLs.

    Requires:

      - REPO = "owner/repo"

      - gh_get(url, params=None, headers=None) is available

    """

    owner, repo = REPO.split("/", 1)

    pr_urls = set()

    # Baseline Accept header for REST v3 + timeline support.

    # The timeline historically required a preview header. Keep both for compatibility.

    base_headers = {

        "Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json"

    }

    # 1) Issue body

    issue_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}"

    issue = gh_get(issue_url)

    if isinstance(issue, dict):

        body = issue.get("body") or ""

        pr_urls |= _collect_from_text(body, owner, repo)

        # If this issue IS itself a PR (when called with a PR number), make sure we don't add itself erroneously

        # We won't add unless text contains it anyway; still fine.

    # 2) All comments

    comments_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}/comments"

    for c in _paginate_gh_get(comments_url):

        body = c.get("body") or ""

        pr_urls |= _collect_from_text(body, owner, repo)

    # 3) Issue events (event stream can have 'mentioned', 'cross-referenced', etc.)

    events_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}/events"

    for ev in _paginate_gh_get(events_url):

        # (a) Free-text fields: some events carry body/commit messages, etc.

        if isinstance(ev, dict):

            body = ev.get("body") or ""

            pr_urls |= _collect_from_text(body, owner, repo)

            # (b) Structured cross-reference (best: 'cross-referenced' events)

            # If the source.issue has 'pull_request' key, it's a PR; use its html_url.

            if ev.get("event") == "cross-referenced":

                src = ev.get("source") or {}

                issue_obj = src.get("issue") or {}

                pr_obj = issue_obj.get("pull_request") or {}

                html_url = issue_obj.get("html_url")

                if pr_obj and html_url and "/pull/" in html_url:

                    pr_urls.add(html_url)

                # Fallback: If not marked but looks like a PR in URL

                elif html_url and "/pull/" in html_url:

                    pr_urls.add(html_url)

        # (c) Also include 'mentioned' events (broadened): inspect whatever text fields exist

        # Already covered via 'body' text extraction

    # 4) Timeline API (the most complete for references)

    timeline_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}/timeline"

    for item in _paginate_gh_get(timeline_url):

        if not isinstance(item, dict):

            continue

        # Free-text scan on any plausible string field

        for key in ("body", "message", "title", "commit_message", "subject"):

            val = item.get(key)

            if isinstance(val, str):

                pr_urls |= _collect_from_text(val, owner, repo)

        # Structured cross-reference payloads

        if item.get("event") == "cross-referenced":

            src = item.get("source") or {}

            issue_obj = src.get("issue") or {}

            pr_obj = issue_obj.get("pull_request") or {}

            html_url = issue_obj.get("html_url")

            if pr_obj and html_url and "/pull/" in html_url:

                pr_urls.add(html_url)

            elif html_url and "/pull/" in html_url:

                pr_urls.add(html_url)

        # Some timeline items are themselves issues/PRs with html_url

        html_url = item.get("html_url")

        if isinstance(html_url, str) and "/pull/" in html_url:

            pr_urls.add(html_url)

        # Occasionally the timeline includes API-style URLs

        api_url = item.get("url")

        if isinstance(api_url, str):

            m = PR_API_URL_RE.search(api_url)

            if m:

                pr_urls.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num")))

    # Final normalization: keep only HTML PR URLs and sort

    pr_urls = {m.group(0) for url in pr_urls for m in [PR_HTML_URL_RE.search(url)] if m}

    return sorted(pr_urls)

def pr_number_from_url(u):

  m = re.search(r"/pull/(\d+)", u)

  return int(m.group(1)) if m else None

def list_pr_files(pr_number):

  url = f"https://api.github.com/repos/{REPO}/pulls/{pr_number}/files"

  files = []

  page = 1

  while True:

    batch = gh_get(url, params={"per_page":100,"page":page})

    if not batch:

      break

    files.extend(batch)

    page += 1

  return files

def get_pr_head_sha(pr_number):

  url = f"https://api.github.com/repos/{REPO}/pulls/{pr_number}"

  pr = gh_get(url)

  return pr["head"]["sha"]

def get_file_at_sha(path, sha):

  # Use contents API to fetch file at a specific ref (sha).

  url = f"https://api.github.com/repos/{REPO}/contents/{path}"

  r = requests.get(url, headers=HEADERS, params={"ref": sha})

  if r.status_code == 404:

    return None

  r.raise_for_status()

  data = r.json()

  if isinstance(data, dict) and data.get("type") == "file" and data.get("download_url"):

    return gh_get_text(data["download_url"])

  return None

def extract_module_term_from_source(src: str) -> str | None:

    """

    Given a module 'source' string, return the last path segment between the

    final '/' and the '?' (or end of string if '?' is absent).

    Examples:

      git::https://...//modules/container/kubernetes-service?ref=v4.0.15 -> 'kubernetes-service'

      ../modules/network/vnet -> 'vnet'

      registry- or other sources with no '/' -> returns None

    """

    if not isinstance(src, str) or not src:

        return None

    # Strip query string

    path = src.split('?', 1)[0]

    # For git:: URLs that include a double-slash path component ("//modules/..."),

    # keep the right-most path component regardless of scheme.

    # Normalize backslashes just in case.

    path = path.replace('\\', '/')

    # Remove trailing slashes

    path = path.rstrip('/')

    # Split and take last non-empty part

    parts = [p for p in path.split('/') if p]

    if not parts:

        return None

    return parts[-1]

def parse_module_terms_from_tf(tf_text):

    """

    Parse HCL to find module blocks and return the set of module 'terms'

    extracted from their 'source' attribute (last segment before '?').

    """

    terms = set()

    try:

        obj = hcl2.loads(tf_text)

    except Exception:

        return terms

    mods = obj.get("module", [])

    # module is usually list of dicts: [{ "name": { "source": "...", ... }}, ...]

    def add_src_term(src_str: str):

        term = extract_module_term_from_source(src_str)

        if term:

            terms.add(term)

    if isinstance(mods, list):

        for item in mods:

            if isinstance(item, dict):

                for _, body in item.items():

                    if isinstance(body, dict):

                        src = body.get("source")

                        if isinstance(src, str):

                            add_src_term(src)

    elif isinstance(mods, dict):

        for _, body in mods.items():

            if isinstance(body, dict):

                src = body.get("source")

                if isinstance(src, str):

                    add_src_term(src)

    return terms

def parse_module_sources_from_tf(tf_text):

  # Extract module "x" { source = "..." } blocks.

  sources = set()

  try:

    obj = hcl2.loads(tf_text)

  except Exception:

    return sources

  mods = obj.get("module", [])

  # module is usually list of dicts: [{ "name": { "source": "...", ... }}, ...]

  if isinstance(mods, list):

    for item in mods:

      if isinstance(item, dict):

        for _, body in item.items():

          if isinstance(body, dict):

            src = body.get("source")

            if isinstance(src, str):

              sources.add(src)

  elif isinstance(mods, dict):

    for _, body in mods.items():

      if isinstance(body, dict):

        src = body.get("source")

        if isinstance(src, str):

          sources.add(src)

  return sources

def normalize_local_module_path(source, app_dir):

  # Only resolve local paths within repo; ignore registry/git/http sources.

  if source.startswith("./") or source.startswith("../"):

    # app_dir is like "workload/appA"

    import posixpath

    return posixpath.normpath(posixpath.join(app_dir, source))

  return None

def list_repo_tf_files_under(dir_path, sha):

  # Best-effort: use git (checked out main) for listing; then fetch content at sha.

  # We only need paths; use `git ls-tree` against sha for accuracy.

  import subprocess

  try:

    out = subprocess.check_output(["git","ls-tree","-r","--name-only",sha,dir_path], text=True)

    paths = [p.strip() for p in out.splitlines() if p.strip().endswith(".tf")]

    return paths

  except Exception:

    return []

def collect_module_terms_for_app(app_dir, sha):

    """

    Scan all .tf in the app dir at PR head sha; extract:

      1) module terms directly used by the app

      2) for any local module sources, recurse one level and extract module terms defined there

    """

    terms = set()

    module_dirs = set()

    tf_paths = list_repo_tf_files_under(app_dir, sha)

    for p in tf_paths:

        txt = get_file_at_sha(p, sha)

        if not txt:

            continue

        # Collect module terms directly in the app

        terms |= parse_module_terms_from_tf(txt)

        # Track local modules so we can scan their contents

        for src in parse_module_sources_from_tf(txt):

            local = normalize_local_module_path(src, app_dir)

            if local:

                module_dirs.add(local)

    # Scan local module dirs for additional module terms (one level deep)

    for mdir in sorted(module_dirs):

        m_tf_paths = list_repo_tf_files_under(mdir, sha)

        for p in m_tf_paths:

            txt = get_file_at_sha(p, sha)

            if not txt:

                continue

            terms |= parse_module_terms_from_tf(txt)

    return terms

# ---- Main: issues -> PRs -> touched apps -> module terms ----

issues = list_closed_issues()

issue_to_terms = {} # issue_number -> set(module_terms)

for issue in issues:

  inum = issue["number"]

  pr_urls = extract_pr_urls_from_issue(inum)

  pr_numbers = sorted({pr_number_from_url(u) for u in pr_urls if pr_number_from_url(u)})

  if not pr_numbers:

    continue

  terms_for_issue = set()

  for prn in pr_numbers:

    sha = get_pr_head_sha(prn)

    files = list_pr_files(prn)

    # Identify which workload apps are touched by this PR.

    # Requirement: multiple application folders within "workload/".

    touched_apps = set()

    for f in files:

      path = f.get("filename","")

      if not path.startswith("workload/"):

        continue

      parts = path.split("/")

      if len(parts) >= 2:

        touched_apps.add("/".join(parts[:2])) # workload/<app>

    # For each touched app, compute module terms by scanning app + local modules.

    for app_dir in sorted(touched_apps):

      terms_for_issue |= collect_module_terms_for_app(app_dir, sha)

  if terms_for_issue:

    issue_to_terms[inum] = sorted(terms_for_issue)

# Build severity distribution: "severity" = number of issues touching each module term.

rows = []

for inum, terms in issue_to_terms.items():

  for t in set(terms):

    rows.append({"issue": inum, "module_term": t})

print(f"rows={len(rows)}")

df = pd.DataFrame(rows)

df.to_csv("severity_data.csv", index=False)

# Also write a compact JSON for debugging/audit.

with open("issue_to_module_terms.json","w") as f:

  json.dump(issue_to_terms, f, indent=2, sort_keys=True)

print(f"Closed issues considered: {len(issues)}")

print(f"Issues with PR-linked module impact: {len(issue_to_terms)}")

import os, json, re, requests, subprocess

import hcl2

REPO = os.environ["REPO"]

TOKEN = os.environ["GH_TOKEN"]

HEADERS = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json", "X-GitHub-Api-Version": "2026-03-10"}

with open("issues.json") as f:

  issues = json.load(f)

issue_to_terms = {}

issue_turnaround = {}

module_deps = {} # app_dir -> set(module paths it references)

for issue in issues:

  inum = issue["number"]

  created = issue.get("created_at")

  closed = issue.get("closed_at")

  if created and closed:

    from datetime import datetime

    fmt = "%Y-%m-%dT%H:%M:%SZ"

    try:

      dt_created = datetime.strptime(created, fmt)

      dt_closed = datetime.strptime(closed, fmt)

      delta_days = (dt_closed - dt_created).total_seconds() / 86400.0

    except Exception:

      delta_days = None

  else:

    delta_days = None

  issue_turnaround[inum] = delta_days

  pr_urls = issue.get("pr_urls",[])

  pr_numbers = sorted({pr_number_from_url(u) for u in pr_urls if pr_number_from_url(u)})

  terms_for_issue = set()

  for prn in pr_numbers:

    sha = get_pr_head_sha(prn)

    files = list_pr_files(prn)

    touched_apps = set()

    for f in files:

      path = f.get("filename","")

      if path.startswith("workload/"):

        parts = path.split("/")

        if len(parts) >= 2:

          touched_apps.add("/".join(parts[:2]))

    for app_dir in sorted(touched_apps):

      terms_for_issue |= collect_module_terms_for_app(app_dir, sha)

      # collect module sources for dependency graph

      # scan app tf files for module sources at PR head

      tf_paths = list_repo_tf_files_under(app_dir, sha)

      for p in tf_paths:

        txt = get_file_at_sha(p, sha)

        if not txt:

          continue

        for src in parse_module_sources_from_tf(txt):

          local = normalize_local_module_path(src, app_dir)

          if local:

            module_deps.setdefault(app_dir, set()).add(local)

  if terms_for_issue:

    issue_to_terms[inum] = sorted(terms_for_issue)

rows = []

for inum, terms in issue_to_terms.items():

  for t in set(terms):

    rows.append({"issue": inum, "module_term": t})

import pandas as pd

df = pd.DataFrame(rows)

df.to_csv("severity_data.csv", index=False)

ta_rows = []

for inum, days in issue_turnaround.items():

  ta_rows.append({"issue": inum, "turnaround_days": days})

pd.DataFrame(ta_rows).to_csv("turnaround.csv", index=False)

with open("issue_to_module_terms.json","w") as f:

  json.dump(issue_to_terms, f, indent=2)

with open("issue_turnaround.json","w") as f:

  json.dump(issue_turnaround, f, indent=2)

with open("module_deps.json","w") as f:

  json.dump({k: sorted(list(v)) for k,v in module_deps.items()}, f, indent=2)

print(f"ISSUES_WITH_TYPES={len(issue_to_terms)}")

import os, json, datetime, glob

import pandas as pd

import matplotlib.pyplot as plt

import seaborn as sns

import networkx as nx

ts = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S")

os.makedirs("history", exist_ok=True)

def read_csv(file_name):

    df = None

    if os.path.exists("severity_data.csv"):

        try:

           df = pd.read_csv("severity_data.csv")

        except Exception:

           df = None

    else:

        df = None

    return df

# --- Severity bar (existing) ---

if os.path.exists("severity_data.csv"):

  df = read_csv("severity_data.csv")

  if df == None:

     df = pd.DataFrame(columns=["issue", "module_term"])

  counts = df.groupby("module_term")["issue"].nunique().sort_values(ascending=False)

else:

  counts = pd.Series(dtype=int)

png_sev = f"history/severity-by-module-{ts}.png"

plt.figure(figsize=(12,6))

if not counts.empty:

  counts.plot(kind="bar")

  plt.title("Issue frequency by module term")

  plt.xlabel("module_term")

  plt.ylabel("number of closed issues touching module term")

else:

  plt.text(0.5, 0.5, "No module-impacting issues in window", ha="center", va="center")

  plt.axis("off")

plt.tight_layout()

plt.savefig(png_sev)

plt.clf()

# --- Heatmap: module_term x issue (binary or counts) ---

heat_png = f"history/heatmap-module-issues-{ts}.png"

if os.path.exists("severity_data.csv"):

  mat = read_csv("severity_data.csv")

  if not mat:

     mat = pd.DataFrame(columns=["issue", "module_term"])

  if not mat.empty:

    pivot = mat.pivot_table(index="module_term", columns="issue", aggfunc='size', fill_value=0)

    # Optionally cluster or sort by total counts

    pivot['total'] = pivot.sum(axis=1)

    pivot = pivot.sort_values('total', ascending=False).drop(columns=['total'])

    # limit columns for readability (most recent/top issues)

    if pivot.shape[1] > 100:

      pivot = pivot.iloc[:, :100]

    plt.figure(figsize=(14, max(6, 0.2 * pivot.shape[0])))

    sns.heatmap(pivot, cmap="YlOrRd", cbar=True)

    plt.title("Heatmap: module terms (rows) vs issues (columns)")

    plt.xlabel("Issue number (truncated)")

    plt.ylabel("module terms")

    plt.tight_layout()

    plt.savefig(heat_png)

    plt.clf()

  else:

    plt.figure(figsize=(6,2))

    plt.text(0.5,0.5,"No data for heatmap",ha="center",va="center")

    plt.axis("off")

    plt.savefig(heat_png)

    plt.clf()

else:

  plt.figure(figsize=(6,2))

  plt.text(0.5,0.5,"No data for heatmap",ha="center",va="center")

  plt.axis("off")

  plt.savefig(heat_png)

  plt.clf()

# --- Trend lines: aggregate historical severity_data.csv files in history/ ---

trend_png = f"history/trendlines-module-{ts}.png"

# collect historical CSVs that match severity_data pattern

hist_files = sorted(glob.glob("history/*severity-data-*.csv") + glob.glob("history/*severity_data.csv") + glob.glob("history/*severity-by-module-*.csv"))

# also include current run's severity_data.csv

if os.path.exists("severity_data.csv"):

  hist_files.append("severity_data.csv")

# Build weekly counts per module terms by deriving timestamp from filenames where possible

trend_df = pd.DataFrame()

for f in hist_files:

  try:

    # attempt to extract timestamp from filename

    import re

    m = re.search(r"(\d{8}-\d{6})", f)

    ts_label = m.group(1) if m else os.path.getmtime(f)

    tmp = read_csv(f)

    if tmp == None or tmp.empty:

      continue

    counts_tmp = tmp.groupby("module_terms")["issue"].nunique().rename(ts_label)

    trend_df = pd.concat([trend_df, counts_tmp], axis=1)

  except Exception:

    continue

if not trend_df.empty:

  trend_df = trend_df.fillna(0).T

  # convert index to datetime where possible

  try:

    trend_df.index = pd.to_datetime(trend_df.index, format="%Y%m%d-%H%M%S", errors='coerce').fillna(pd.to_datetime(trend_df.index, unit='s'))

  except Exception:

    pass

  plt.figure(figsize=(14,6))

  # plot top N module_terms by latest total

  latest = trend_df.iloc[-1].sort_values(ascending=False).head(8).index.tolist()

  for col in latest:

    plt.plot(trend_df.index, trend_df[col], marker='o', label=col)

  plt.legend(loc='best', fontsize='small')

  plt.title("Trend lines: issue frequency over time for top module_terms")

  plt.xlabel("time")

  plt.ylabel("issue count")

  plt.xticks(rotation=45)

  plt.tight_layout()

  plt.savefig(trend_png)

  plt.clf()

else:

  plt.figure(figsize=(8,2))

  plt.text(0.5,0.5,"No historical data for trend lines",ha="center",va="center")

  plt.axis("off")

  plt.savefig(trend_png)

  plt.clf()

# --- Dependency graph: build directed graph from module_deps.json ---

dep_png = f"history/dependency-graph-{ts}.png"

if os.path.exists("module_deps.json"):

  with open("module_deps.json") as f:

    deps = json.load(f)

  G = nx.DiGraph()

  # add edges app -> module

  for app, mods in deps.items():

    G.add_node(app, type='app')

    for m in mods:

      G.add_node(m, type='module')

      G.add_edge(app, m)

  if len(G.nodes) == 0:

    plt.figure(figsize=(6,2))

    plt.text(0.5,0.5,"No dependency data",ha="center",va="center")

    plt.axis("off")

    plt.savefig(dep_png)

    plt.clf()

  else:

    plt.figure(figsize=(12,8))

    pos = nx.spring_layout(G, k=0.5, iterations=50)

    node_colors = ['#1f78b4' if G.nodes[n].get('type')=='app' else '#33a02c' for n in G.nodes()]

    nx.draw_networkx_nodes(G, pos, node_size=600, node_color=node_colors)

    nx.draw_networkx_edges(G, pos, arrows=True, arrowstyle='->', arrowsize=12, edge_color='#888888')

    nx.draw_networkx_labels(G, pos, font_size=8)

    plt.title("Module dependency graph (apps -> local modules)")

    plt.axis('off')

    plt.tight_layout()

    plt.savefig(dep_png)

    plt.clf()

else:

  plt.figure(figsize=(6,2))

  plt.text(0.5,0.5,"No dependency data",ha="center",va="center")

  plt.axis("off")

  plt.savefig(dep_png)

  plt.clf()

# --- Turnaround chart (existing) ---

ta_png = f"history/turnaround-by-issue-{ts}.png"

if os.path.exists("turnaround.csv"):

  ta = read_csv("turnaround.csv")

  if ta == None:

     ta = pd.DataFrame(columns=["issue", "turnaround_days"])

  ta = ta.dropna(subset=["turnaround_days"])

  if not ta.empty:

    ta_sorted = ta.sort_values("turnaround_days", ascending=False).head(50)

    plt.figure(figsize=(12,6))

    plt.bar(ta_sorted["issue"].astype(str), ta_sorted["turnaround_days"])

    plt.xticks(rotation=90)

    plt.title("Turnaround time (days) for closed issues in window")

    plt.xlabel("Issue number")

    plt.ylabel("Turnaround (days)")

    plt.tight_layout()

    plt.savefig(ta_png)

    plt.clf()

  else:

    plt.figure(figsize=(8,2))

    plt.text(0.5,0.5,"No turnaround data available",ha="center",va="center")

    plt.axis("off")

    plt.savefig(ta_png)

    plt.clf()

else:

  plt.figure(figsize=(8,2))

  plt.text(0.5,0.5,"No turnaround data available",ha="center",va="center")

  plt.axis("off")

  plt.savefig(ta_png)

  plt.clf()

# --- Issue activity charts (opened vs closed) ---

activity_png = f"history/issue-activity-{ts}.png"

if os.path.exists("issue_activity.csv"):

    act = pd.read_csv("issue_activity.csv")

    plt.figure(figsize=(6,4))

    plt.bar(act["metric"], act["count"], color=["#1f78b4", "#33a02c"])

    plt.title("GitHub issue activity in last window")

    plt.xlabel("Issue state")

    plt.ylabel("Count")

    plt.tight_layout()

    plt.savefig(activity_png)

    plt.clf()

else:

    plt.figure(figsize=(6,2))

    plt.text(0.5, 0.5, "No issue activity data", ha="center", va="center")

    plt.axis("off")

    plt.savefig(activity_png)

    plt.clf()

# --- AI summary (who wants what) ---

if os.path.exists("issues.json"):

  with open("issues.json") as f:

    issues = json.load(f)

else:

  issues = []

condensed = []

for i in issues:

  condensed.append({

    "number": i.get("number"),

    "user": i.get("user"),

    "title": i.get("title"),

    "html_url": i.get("html_url")

  })

with open("issues_for_ai.json","w") as f:

  json.dump(condensed, f, indent=2)

# call OpenAI if key present (same approach as before)

import subprocess, os

OPENAI_KEY = os.environ.get("OPENAI_API_KEY")

ai_text = "AI summary skipped (no OPENAI_API_KEY)."

if OPENAI_KEY:

  prompt = ("You are given a JSON array of GitHub issues with fields: number, user, title, html_url. "

            "Produce a concise list of one-line 'who wants what' statements, one per issue, in plain text. "

            "Format: '#<number> — <user> wants <succinct request derived from title>'. "

            "Do not add commentary.")

  payload = {

    "model": "gpt-4o-mini",

    "messages": [{"role":"system","content":"You are a concise summarizer."},

                 {"role":"user","content": prompt + "\\n\\nJSON:\\n" + json.dumps(condensed)[:15000]}],

    "temperature":0.2,

    "max_tokens":400

  }

  proc = subprocess.run([

    "curl","-sS","https://api.openai.com/v1/chat/completions",

    "-H", "Content-Type: application/json",

    "-H", f"Authorization: Bearer {OPENAI_KEY}",

    "-d", json.dumps(payload)

  ], capture_output=True, text=True)

  if proc.returncode == 0 and proc.stdout:

    try:

      resp = json.loads(proc.stdout)

      ai_text = resp["choices"][0]["message"]["content"].strip()

    except Exception:

      ai_text = "AI summary unavailable (parsing error)."

# --- Write markdown report combining all visuals ---

md_path = f"history/severity-report-{ts}.md"

with open(md_path, "w") as f:

  f.write("# Weekly Terraform module hotspot report\n\n")

  f.write(f"**Window (days):** {os.environ.get('WINDOW_DAYS','7')}\n\n")

  f.write("## AI Summary (who wants what)\n\n")

  f.write("```\n")

  f.write(ai_text + "\n")

  f.write("```\n\n")

  f.write("## GitHub issue activity (last window)\n\n")

  f.write(f"![{os.path.basename(activity_png)}]"

          f"({os.path.basename(activity_png)})\n\n")

  if os.path.exists("issue_activity.csv"):

      act = pd.read_csv("issue_activity.csv")

      f.write(act.to_markdown(index=False) + "\n\n")

  f.write("## Top module terms by issue frequency\n\n")

  if not counts.empty:

    f.write("![" + os.path.basename(png_sev) + "](" + os.path.basename(png_sev) + ")\n\n")

    f.write(counts.head(30).to_frame("issues").to_markdown() + "\n\n")

  else:

    f.write("No module-impacting issues found in the selected window.\n\n")

  f.write("## Heatmap: module terms vs issues\n\n")

  f.write("![" + os.path.basename(heat_png) + "](" + os.path.basename(heat_png) + ")\n\n")

  f.write("## Trend lines: historical issue frequency for top module terms\n\n")

  f.write("![" + os.path.basename(trend_png) + "](" + os.path.basename(trend_png) + ")\n\n")

  f.write("## Dependency graph: apps -> local modules\n\n")

  f.write("![" + os.path.basename(dep_png) + "](" + os.path.basename(dep_png) + ")\n\n")

  f.write("## Turnaround time for closed issues (days)\n\n")

  f.write("![" + os.path.basename(ta_png) + "](" + os.path.basename(ta_png) + ")\n\n")

  f.write("## Data artifacts\n\n")

  f.write("- `severity_data.csv` — per-issue module term mapping\n")

  f.write("- `turnaround.csv` — per-issue turnaround in days\n")

  f.write("- `issue_to_module_terms.json` — mapping used to build charts\n")

  f.write("- `module_deps.json` — module dependency data used for graph\n")

# Save current CSVs into history with timestamp for future trend aggregation

try:

  import shutil

  if os.path.exists("severity_data.csv"):

    shutil.copy("severity_data.csv", f"history/severity-data-{ts}.csv")

  if os.path.exists("turnaround.csv"):

    shutil.copy("turnaround.csv", f"history/turnaround-{ts}.csv")

except Exception:

  pass

print(f"REPORT_MD={md_path}")

print(f"REPORT_PNG={png_sev}")

print(f"REPORT_HEAT={heat_png}")

print(f"REPORT_TREND={trend_png}")

print(f"REPORT_DEP={dep_png}")

print(f"REPORT_TA={ta_png}")

import os, re

from pathlib import Path

hist = Path("history")

hist.mkdir(exist_ok=True)

# Pair md+png by timestamp in filename: severity-by-module-YYYYMMDD-HHMMSS.(md|png)

pat = re.compile(r"^severity-by-module-(\d{8}-\d{6})\.(md|png)$")

groups = {}

for p in hist.iterdir():

  m = pat.match(p.name)

  if not m:

    continue

  ts = m.group(1)

  groups.setdefault(ts, []).append(p)

# Keep newest 10 timestamps

timestamps = sorted(groups.keys(), reverse=True)

keep = set(timestamps[:10])

drop = [p for ts, files in groups.items() if ts not in keep for p in files]

for p in drop:

  p.unlink()

print(f"Kept {len(keep)} report sets; pruned {len(drop)} files.")

---

This produces sample output including the various json and csv files as mentioned above. We list just one of them:

                  metric count

0 #opened 8

1 #closed 8

Care must be taken to not run into rate limits: For example:

{“message”: “API rate limit exceeded for <client-ip-address>”, “documentation_url”: https://docs.github.com/rest/overview/resources-in-the-rest-api#rate-limiting}


No comments:

Post a Comment