Thursday, April 2, 2026

 The following is a sample code for getting custom insights into GitHub issues opened against a repository on a periodic basis: 

#! /usr/bin/python 

import os, requests, json, datetime, re 

REPO = os.environ["REPO"] 

TOKEN = os.environ["GH_TOKEN"] 

WINDOW_DAYS = int(os.environ.get("WINDOW_DAYS","7")) 

HEADERS = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json", "X-GitHub-Api-Version": "2026-03-10"} 

since = (datetime.datetime.utcnow() - datetime.timedelta(days=WINDOW_DAYS)).isoformat() + "Z" 

 

# ---- Helpers ---- 

def gh_get(url, params=None,ignore_status_codes=None): 

  r = requests.get(url, headers=HEADERS, params=params) 

  if ignore_status_codes is not None: 

       if isinstance(ignore_status_codes, int): 

             ignore_status_codes = {ignore_status_codes} 

       else: 

             ignore_status_codes = set(ignore_status_codes) 

       if r.status_code in ignore_status_codes: 

             return None 

  r.raise_for_status() 

  return r.json() 

 

def gh_get_text(url): 

  r = requests.get(url, headers=HEADERS) 

  r.raise_for_status() 

  return r.text 

 

issues_url = f"https://api.github.com/repos/{REPO}/issues" 

params = {"state":"closed","since":since,"per_page":100} 

items = gh_get(issues_url, params=params) 

 

issues = [] 

for i in items: 

  if "pull_request" in i: 

    continue 

  comments = gh_get(i["comments_url"], params={"per_page":100}) 

  pr_urls = set() 

  for c in comments: 

    body = c.get("body","") or "" 

    for m in re.findall(r"https://github\.com/[^/\s]+/[^/\s]+/pull/\d+", body): 

      pr_urls.add(m) 

    for m in re.findall(r"(?:^|\s)#(\d+)\b", body): 

      pr_urls.add(f"https://github.com/{REPO}/pull/{m}") 

  issues.append({ 

    "number": i["number"], 

    "title": i.get("title",""), 

    "user": i.get("user",{}).get("login",""), 

    "created_at": i.get("created_at"), 

    "closed_at": i.get("closed_at"), 

    "html_url": i.get("html_url"), 

    "comments": [{"id":c.get("id"), "body":c.get("body",""), "created_at":c.get("created_at")} for c in comments], 

    "pr_urls": sorted(pr_urls) 

  }) 

 

with open("issues.json","w") as f: 

  json.dump(issues, f, indent=2) 

print(f"WROTE_ISSUES={len(issues)}") 

 

import os, requests, datetime, pandas as pd 

 

REPO = os.environ["REPO"] 

TOKEN = os.environ["GH_TOKEN"] 

WINDOW_DAYS = int(os.environ.get("WINDOW_DAYS", "7")) 

 

headers = { 

  "Authorization": f"Bearer {TOKEN}", 

  "Accept": "application/vnd.github+json", 

} 

 

since = (datetime.datetime.utcnow() - datetime.timedelta(days=WINDOW_DAYS)).isoformat() + "Z" 

url = f"https://api.github.com/repos/{REPO}/issues" 

 

def fetch(state): 

  items = [] 

  page = 1 

  while True: 

    r = requests.get( 

      url, 

      headers=headers, 

      params={"state": state, "since": since, "per_page": 100, "page": page}, 

    ) 

    r.raise_for_status() 

    batch = [i for i in r.json() if "pull_request" not in i] 

    if not batch: 

      break 

    items.extend(batch) 

    if len(batch) < 100: 

      break 

    page += 1 

  return items 

 

opened = fetch("open") 

closed = fetch("closed") 

 

df = pd.DataFrame( 

  [ 

    {"metric": "opened", "count": len(opened)}, 

    {"metric": "closed", "count": len(closed)}, 

  ] 

) 

 

df.to_csv("issue_activity.csv", index=False) 

print(df) 

 

import os, re, json, datetime, requests 

import hcl2 

import pandas as pd 

 

REPO = os.environ["GITHUB_REPOSITORY"] 

GH_TOKEN = os.environ["GH_TOKEN"] 

HEADERS = {"Authorization": f"Bearer {GH_TOKEN}", "Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json", "X-GitHub-Api-Version": "2026-03-10"} 

 

# ---- Time window (last 7 days) ---- 

since = (datetime.datetime.utcnow() - datetime.timedelta(days=7)).isoformat() + "Z" 

 

# ---- Helpers ---- 

def list_closed_issues(): 

  # Issues API returns both issues and PRs; filter out PRs. 

  url = f"https://api.github.com/repos/{REPO}/issues" 

  items = gh_get(url, params={"state":"closed","since":since,"per_page":100}) 

  return [i for i in items if "pull_request" not in i] 

 

PR_HTML_URL_RE = re.compile( 

    r"https?://github\.com/(?P<owner>[^/\s]+)/(?P<repo>[^/\s]+)/pull/(?P<num>\d+)", 

    re.IGNORECASE, 

) 

PR_API_URL_RE = re.compile( 

    r"https?://api\.github\.com/repos/(?P<owner>[^/\s]+)/(?P<repo>[^/\s]+)/pulls/(?P<num>\d+)", 

    re.IGNORECASE, 

) 

 

# Shorthand references that might appear in text: 

#   - #123  (assumed to be same repo) 

#   - owner/repo#123 (explicit cross-repo) 

SHORTHAND_SAME_REPO_RE = re.compile(r"(?<!\w)#(?P<num>\d+)\b") 

SHORTHAND_CROSS_REPO_RE = re.compile( 

    r"(?P<owner>[A-Za-z0-9_.-]+)/(?P<repo>[A-Za-z0-9_.-]+)#(?P<num>\d+)\b" 

) 

 

def _normalize_html_pr_url(owner: str, repo: str, num: int) -> str: 

    return f"https://github.com/{owner}/{repo}/pull/{int(num)}" 

 

def _collect_from_text(text: str, default_owner: str, default_repo: str) -> set: 

    """Extract candidate PR URLs from free text (body/comments/events text).""" 

    found = set() 

    if not text: 

        return found 

  

    # 1) Direct HTML PR URLs 

    for m in PR_HTML_URL_RE.finditer(text): 

        found.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num"))) 

 

    # 2) API PR URLs -> convert to HTML 

    for m in PR_API_URL_RE.finditer(text): 

        found.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num"))) 

 

    # 3) Cross-repo shorthand: owner/repo#123 (we will treat it as PR URL candidate) 

    for m in SHORTHAND_CROSS_REPO_RE.finditer(text): 

        found.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num"))) 

 

    # 4) Same-repo shorthand: #123 

    for m in SHORTHAND_SAME_REPO_RE.finditer(text): 

        found.add(_normalize_html_pr_url(default_ownerdefault_repom.group("num"))) 

 

    return found 

 

def _paginate_gh_get(url, headers=None, per_page=100): 

    """Generator: fetch all pages until fewer than per_page are returned.""" 

    page = 1 

    while True: 

        data = gh_get(url, params={"per_page": per_page, "page": page}) 

        if not isinstance(data, list) or len(data) == 0: 

            break 

        for item in data: 

            yield item 

        if len(data) < per_page: 

            break 

        page += 1 

 

def extract_pr_urls_from_issue(issue_number: int): 

    """ 

    Extract PR URLs associated with an issue by scanning: 

      - Issue body 

      - Issue comments 

      - Issue events (including 'mentioned', 'cross-referenced', etc.) 

      - Issue timeline (most reliable for cross references) 

 

    Returns a sorted list of unique, normalized HTML PR URLs. 

    Requires: 

      - REPO = "owner/repo" 

      - gh_get(url, params=None, headers=None) is available 

    """ 

    owner, repo = REPO.split("/", 1) 

    pr_urls = set() 

 

    # Baseline Accept header for REST v3 + timeline support. 

    # The timeline historically required a preview header. Keep both for compatibility. 

    base_headers = { 

        "Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json" 

    } 

 

    # 1) Issue body 

    issue_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}" 

    issue = gh_get(issue_url) 

    if isinstance(issue, dict): 

        body = issue.get("body") or "" 

        pr_urls |= _collect_from_text(body, owner, repo) 

 

        # If this issue IS itself a PR (when called with a PR number), make sure we don't add itself erroneously 

        # We won't add unless text contains it anyway; still fine. 

 

    # 2) All comments 

    comments_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}/comments" 

    for c in _paginate_gh_get(comments_url): 

        body = c.get("body") or "" 

        pr_urls |= _collect_from_text(body, owner, repo) 

 

    # 3) Issue events (event stream can have 'mentioned', 'cross-referenced', etc.) 

    events_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}/events" 

    for ev in _paginate_gh_get(events_url): 

        # (a) Free-text fields: some events carry body/commit messages, etc. 

        if isinstance(evdict): 

            body = ev.get("body") or "" 

            pr_urls |= _collect_from_text(body, owner, repo) 

 

            # (b) Structured cross-reference (best: 'cross-referenced' events) 

            #     If the source.issue has 'pull_request' key, it's a PR; use its html_url. 

            if ev.get("event") == "cross-referenced": 

                src = ev.get("source") or {} 

                issue_obj = src.get("issue") or {} 

                pr_obj = issue_obj.get("pull_request") or {} 

                html_url = issue_obj.get("html_url") 

                if pr_obj and html_url and "/pull/" in html_url: 

                    pr_urls.add(html_url) 

                # Fallback: If not marked but looks like a PR in URL 

                elif html_url and "/pull/" in html_url: 

                    pr_urls.add(html_url) 

 

        # (c) Also include 'mentioned' events (broadened): inspect whatever text fields exist 

        # Already covered via 'body' text extraction 

 

    # 4) Timeline API (the most complete for references) 

    timeline_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}/timeline" 

    for item in _paginate_gh_get(timeline_url): 

        if not isinstance(item, dict): 

            continue 

 

        # Free-text scan on any plausible string field 

        for key in ("body", "message", "title", "commit_message", "subject"): 

            val = item.get(key) 

            if isinstance(val, str): 

                pr_urls |= _collect_from_text(val, owner, repo) 

 

        # Structured cross-reference payloads 

        if item.get("event") == "cross-referenced": 

            src = item.get("source") or {} 

            issue_obj = src.get("issue") or {} 

            pr_obj = issue_obj.get("pull_request") or {} 

            html_url = issue_obj.get("html_url") 

            if pr_obj and html_url and "/pull/" in html_url: 

                pr_urls.add(html_url) 

            elif html_url and "/pull/" in html_url: 

                pr_urls.add(html_url) 

 

        # Some timeline items are themselves issues/PRs with html_url 

        html_url = item.get("html_url") 

        if isinstance(html_url, str) and "/pull/" in html_url: 

            pr_urls.add(html_url) 

 

        # Occasionally the timeline includes API-style URLs 

        api_url = item.get("url") 

        if isinstance(api_url, str): 

            m = PR_API_URL_RE.search(api_url) 

            if m: 

                pr_urls.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num"))) 

 

    # Final normalization: keep only HTML PR URLs and sort 

    pr_urls = {m.group(0) for url in pr_urls for m in [PR_HTML_URL_RE.search(url)] if m} 

    return sorted(pr_urls) 

 

def pr_number_from_url(u): 

  m = re.search(r"/pull/(\d+)", u) 

  return int(m.group(1)) if m else None 

 

def list_pr_files(pr_number): 

  url = f"https://api.github.com/repos/{REPO}/pulls/{pr_number}/files" 

  files = [] 

  page = 1 

  while True: 

    batch = gh_get(url, params={"per_page":100,"page":page}ignore_status_codes=404) 

    if not batch: 

      break 

    files.extend(batch) 

    page += 1 

  return files 

 

def get_pr_head_sha(pr_number): 

  url = f"https://api.github.com/repos/{REPO}/pulls/{pr_number}" 

  pr = gh_get(urlignore_status_codes=404) 

  return pr["head"]["sha"] 

 

def get_file_at_sha(path, sha): 

  # Use contents API to fetch file at a specific ref (sha). 

  url = f"https://api.github.com/repos/{REPO}/contents/{path}" 

  r = requests.get(url, headers=HEADERS, params={"ref": sha}) 

  if r.status_code == 404: 

    return None 

  r.raise_for_status() 

  data = r.json() 

  if isinstance(data, dict) and data.get("type") == "file" and data.get("download_url"): 

    return gh_get_text(data["download_url"]) 

  return None 

 

def extract_module_term_from_source(src: str) -> str | None: 

    """ 

    Given a module 'source' string, return the last path segment between the 

    final '/' and the '?' (or end of string if '?' is absent). 

    Examples: 

      git::https://...//modules/container/kubernetes-service?ref=v4.0.15 -> 'kubernetes-service' 

      ../modules/network/vnet -> 'vnet' 

      registry- or other sources with no '/' -> returns None 

    """ 

    if not isinstance(src, str) or not src: 

        return None 

    # Strip query string 

    path = src.split('?', 1)[0] 

    # For git:: URLs that include a double-slash path component ("//modules/..."), 

    # keep the right-most path component regardless of scheme. 

    # Normalize backslashes just in case. 

    path = path.replace('\\', '/') 

    # Remove trailing slashes 

    path = path.rstrip('/') 

    # Split and take last non-empty part 

    parts = [p for p in path.split('/') if p] 

    if not parts: 

        return None 

    return parts[-1] 

 

def parse_module_terms_from_tf(tf_text): 

    """ 

    Parse HCL to find module blocks and return the set of module 'terms' 

    extracted from their 'source' attribute (last segment before '?'). 

    """ 

    terms = set() 

    try: 

        obj = hcl2.loads(tf_text) 

    except Exception: 

        return terms 

 

    mods = obj.get("module", []) 

    # module is usually list of dicts[{ "name": { "source": "...", ... }}, ...] 

    def add_src_term(src_str: str): 

        term = extract_module_term_from_source(src_str) 

        if term: 

            terms.add(term) 

 

    if isinstance(mods, list): 

        for item in mods: 

            if isinstance(item, dict): 

                for _, body in item.items(): 

                    if isinstance(body, dict): 

                        src = body.get("source") 

                        if isinstance(src, str): 

                            add_src_term(src) 

    elif isinstance(mods, dict): 

        for _, body in mods.items(): 

            if isinstance(body, dict): 

                src = body.get("source") 

                if isinstance(src, str): 

                    add_src_term(src) 

    return terms 

 

def parse_module_sources_from_tf(tf_text): 

  # Extract module "x" { source = "..." } blocks. 

  sources = set() 

  try: 

    obj = hcl2.loads(tf_text) 

  except Exception: 

    return sources 

 

  mods = obj.get("module", []) 

  # module is usually list of dicts[{ "name": { "source": "...", ... }}, ...] 

  if isinstance(mods, list): 

    for item in mods: 

      if isinstance(item, dict): 

        for _, body in item.items(): 

          if isinstance(body, dict): 

            src = body.get("source") 

            if isinstance(src, str): 

              sources.add(src) 

  elif isinstance(mods, dict): 

    for _, body in mods.items(): 

      if isinstance(body, dict): 

        src = body.get("source") 

        if isinstance(src, str): 

          sources.add(src) 

  return sources 

 

def normalize_local_module_path(source, app_dir): 

  # Only resolve local paths within repo; ignore registry/git/http sources. 

  if source.startswith("./") or source.startswith("../"): 

    # app_dir is like "workload/appA" 

    import posixpath 

    return posixpath.normpath(posixpath.join(app_dir, source)) 

  return None 

 

def list_repo_tf_files_under(dir_path, sha): 

  # Best-effort: use git (checked out main) for listing; then fetch content at sha. 

  # We only need paths; use `git ls-tree` against sha for accuracy. 

  import subprocess 

  try: 

    out = subprocess.check_output(["git","ls-tree","-r","--name-only",sha,dir_path], text=True) 

    paths = [p.strip() for p in out.splitlines() if p.strip().endswith(".tf")] 

    return paths 

  except Exception: 

    return [] 

 

def collect_module_terms_for_app(app_dir, sha): 

    """ 

    Scan all .tf in the app dir at PR head sha; extract: 

      1) module terms directly used by the app 

      2) for any local module sources, recurse one level and extract module terms defined there 

    """ 

    terms = set() 

    module_dirs = set() 

 

    tf_paths = list_repo_tf_files_under(app_dir, sha) 

    for p in tf_paths: 

        txt = get_file_at_sha(p, sha) 

        if not txt: 

            continue 

        # Collect module terms directly in the app 

        terms |= parse_module_terms_from_tf(txt) 

        # Track local modules so we can scan their contents 

        for src in parse_module_sources_from_tf(txt): 

            local = normalize_local_module_path(srcapp_dir) 

            if local: 

                module_dirs.add(local) 

 

    # Scan local module dirs for additional module terms (one level deep) 

    for mdir in sorted(module_dirs): 

        m_tf_paths = list_repo_tf_files_under(mdir, sha) 

        for p in m_tf_paths: 

            txt = get_file_at_sha(p, sha) 

            if not txt: 

                continue 

            terms |= parse_module_terms_from_tf(txt) 

 

    return terms 

 

# ---- Main: issues -> PRs -> touched apps -> module terms ---- 

issues = list_closed_issues() 

 

issue_to_terms = {}  # issue_number -> set(module_terms) 

for issue in issues: 

  inum = issue["number"] 

  pr_urls = extract_pr_urls_from_issue(inum) 

  pr_numbers = sorted({pr_number_from_url(u) for u in pr_urls if pr_number_from_url(u)}) 

 

  if not pr_numbers: 

    continue 

 

  terms_for_issue = set() 

 

  for prn in pr_numbers: 

    sha = get_pr_head_sha(prn) 

    files = list_pr_files(prn) 

    if not sha or not files: 

        continue 

    # Identify which workload apps are touched by this PR. 

    # Requirement: multiple application folders within "workload/". 

    touched_apps = set() 

    for f in files: 

      path = f.get("filename","") 

      if not path.startswith("workload/"): 

        continue 

      parts = path.split("/") 

      if len(parts) >= 2: 

        touched_apps.add("/".join(parts[:2]))  # workload/<app> 

 

    # For each touched app, compute module terms by scanning app + local modules. 

    for app_dir in sorted(touched_apps): 

      terms_for_issue |= collect_module_terms_for_app(app_dir, sha) 

 

  if terms_for_issue: 

    issue_to_terms[inum] = sorted(terms_for_issue) 

 

# Build severity distribution: "severity" = number of issues touching each module term. 

rows = [] 

for inum, terms in issue_to_terms.items(): 

  for t in set(terms): 

    rows.append({"issue": inum, "module_term": t}) 

print(f"rows={len(rows)}") 

 

df = pd.DataFrame(rows) 

df.to_csv("severity_data.csv", index=False) 

 

# Also write a compact JSON for debugging/audit. 

with open("issue_to_module_terms.json","w") as f: 

  json.dump(issue_to_terms, f, indent=2, sort_keys=True) 

 

print(f"Closed issues considered: {len(issues)}") 

print(f"Issues with PR-linked module impact: {len(issue_to_terms)}") 

 

import osjson, re, requests, subprocess 

import hcl2 

REPO = os.environ["REPO"] 

TOKEN = os.environ["GH_TOKEN"] 

HEADERS = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json", "X-GitHub-Api-Version": "2026-03-10"} 

 

with open("issues.json") as f: 

  issues = json.load(f) 

 

issue_to_terms = {} 

issue_turnaround = {} 

module_deps = {}  # app_dir -> set(module paths it references) 

 

for issue in issues: 

  inum = issue["number"] 

  created = issue.get("created_at") 

  closed = issue.get("closed_at") 

  if created and closed: 

    from datetime import datetime 

    fmt = "%Y-%m-%dT%H:%M:%SZ" 

    try: 

      dt_created = datetime.strptime(created, fmt) 

      dt_closed = datetime.strptime(closed, fmt) 

      delta_days = (dt_closed - dt_created).total_seconds() / 86400.0 

    except Exception: 

      delta_days = None 

  else: 

    delta_days = None 

  issue_turnaround[inum] = delta_days 

 

  pr_urls = issue.get("pr_urls",[]) 

  pr_numbers = sorted({pr_number_from_url(u) for u in pr_urls if pr_number_from_url(u)}) 

  terms_for_issue = set() 

  for prn in pr_numbers: 

    sha = get_pr_head_sha(prn) 

    files = list_pr_files(prn) 

    touched_apps = set() 

    for f in files: 

      path = f.get("filename","") 

      if path.startswith("workload/"): 

        parts = path.split("/") 

        if len(parts) >= 2: 

          touched_apps.add("/".join(parts[:2])) 

    for app_dir in sorted(touched_apps): 

      terms_for_issue |= collect_module_terms_for_app(app_dir, sha) 

      # collect module sources for dependency graph 

      # scan app tf files for module sources at PR head 

      tf_paths = list_repo_tf_files_under(app_dir, sha) 

      for p in tf_paths: 

        txt = get_file_at_sha(p, sha) 

        if not txt: 

          continue 

        for src in parse_module_sources_from_tf(txt): 

          local = normalize_local_module_path(srcapp_dir) 

          if local: 

            module_deps.setdefault(app_dir, set()).add(local) 

  if terms_for_issue: 

    issue_to_terms[inum] = sorted(terms_for_issue) 

 

rows = [] 

for inum, terms in issue_to_terms.items(): 

  for t in set(terms): 

    rows.append({"issue": inum, "module_term": t}) 

import pandas as pd 

df = pd.DataFrame(rows) 

df.to_csv("severity_data.csv", index=False) 

 

ta_rows = [] 

for inum, days in issue_turnaround.items(): 

  ta_rows.append({"issue": inum, "turnaround_days": days}) 

pd.DataFrame(ta_rows).to_csv("turnaround.csv", index=False) 

 

with open("issue_to_module_terms.json","w") as f: 

  json.dump(issue_to_terms, f, indent=2) 

with open("issue_turnaround.json","w") as f: 

  json.dump(issue_turnaround, f, indent=2) 

with open("module_deps.json","w") as f: 

  json.dump({k: sorted(list(v)) for k,v in module_deps.items()}, f, indent=2) 

 

print(f"ISSUES_WITH_TYPES={len(issue_to_terms)}") 

 

import osjson, datetime, glob 

import pandas as pd 

import matplotlib.pyplot as plt 

import seaborn as sns 

import networkx as nx 

 

ts = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S") 

os.makedirs("history", exist_ok=True) 

 

# --- Severity bar (existing) --- 

if os.path.exists("severity_data.csv"): 

  df = pd.DataFrame(columns=["issue", "module_term"]) 

  try: 

      df = pd.read_csv("severity_data.csv") 

  except: 

     pass 

  counts = df.groupby("module_term")["issue"].nunique().sort_values(ascending=False) 

else: 

  counts = pd.Series(dtype=int) 

 

png_sev = f"history/severity-by-module-{ts}.png" 

plt.figure(figsize=(12,6)) 

if not counts.empty: 

  counts.plot(kind="bar") 

  plt.title("Issue frequency by module term") 

  plt.xlabel("module_term") 

  plt.ylabel("number of closed issues touching module term") 

else: 

  plt.text(0.5, 0.5, "No module-impacting issues in window", ha="center", va="center") 

  plt.axis("off") 

plt.tight_layout() 

plt.savefig(png_sev) 

plt.clf() 

 

# --- Heatmap: module_term x issue (binary or counts) --- 

heat_png = f"history/heatmap-module-issues-{ts}.png" 

 

if os.path.exists("severity_data.csv"): 

  mat = pd.DataFrame(columns=["issue", "module_term"]) 

  try: 

      mat = pd.read_csv("severity_data.csv") 

  except: 

     pass   

  if not mat.empty: 

    pivot = mat.pivot_table(index="module_term", columns="issue", aggfunc='size', fill_value=0) 

    # Optionally cluster or sort by total counts 

    pivot['total'] = pivot.sum(axis=1) 

    pivot = pivot.sort_values('total', ascending=False).drop(columns=['total']) 

    # limit columns for readability (most recent/top issues) 

    if pivot.shape[1] > 100: 

      pivot = pivot.iloc[:, :100] 

    plt.figure(figsize=(14, max(6, 0.2 * pivot.shape[0]))) 

    sns.heatmap(pivot, cmap="YlOrRd", cbar=True) 

    plt.title("Heatmap: module terms (rows) vs issues (columns)") 

    plt.xlabel("Issue number (truncated)") 

    plt.ylabel("module terms") 

    plt.tight_layout() 

    plt.savefig(heat_png) 

    plt.clf() 

  else: 

    plt.figure(figsize=(6,2)) 

    plt.text(0.5,0.5,"No data for heatmap",ha="center",va="center") 

    plt.axis("off") 

    plt.savefig(heat_png) 

    plt.clf() 

else: 

  plt.figure(figsize=(6,2)) 

  plt.text(0.5,0.5,"No data for heatmap",ha="center",va="center") 

  plt.axis("off") 

  plt.savefig(heat_png) 

  plt.clf() 

 

# --- Trend lines: aggregate historical severity_data.csv files in history/ --- 

trend_png = f"history/trendlines-module-{ts}.png" 

collect historical CSVs that match severity_data pattern 

hist_files = sorted(glob.glob("history/*severity-data-*.csv") + glob.glob("history/*severity_data.csv") + glob.glob("history/*severity-by-module-*.csv")) 

also include current run's severity_data.csv 

if os.path.exists("severity_data.csv"): 

  hist_files.append("severity_data.csv") 

# Build weekly counts per module terms by deriving timestamp from filenames where possible 

trend_df = pd.DataFrame() 

for f in hist_files: 

  try: 

    # attempt to extract timestamp from filename 

    import re 

    m = re.search(r"(\d{8}-\d{6})", f) 

    ts_label = m.group(1) if m else os.path.getmtime(f) 

    from datetime import datetime 

    ts_label = str(datetime.utcfromtimestamp(ts_label).strftime(“%Y%m%d-%H%M%S”)) 

    tmp = pd.DataFrame(columns=["issue", "module_term"]) 

    try: 

        tmp = pd.read_csv(f) 

    except: 

       pass 

    if tmp.empty: 

        continue   

    counts_tmp = tmp.groupby("module_term")["issue"].nunique().rename(ts_label) 

    trend_df = pd.concat([trend_dfcounts_tmp], axis=1) 

  except Exception: 

    continue 

if not trend_df.empty: 

  trend_df = trend_df.fillna(0).T 

  # convert index to datetime where possible 

  plt.figure(figsize=(14,6)) 

  # plot top N module_terms by latest total 

  latest = trend_df.iloc[-1].sort_values(ascending=False).head(8).index.tolist() 

  for col in latest: 

    plt.plot(trend_df.indextrend_df[col], marker='o', label=col) 

  plt.legend(loc='best', fontsize='small') 

  plt.title("Trend lines: issue frequency over time for top module_terms") 

  plt.xlabel("time") 

  plt.ylabel("issue count") 

  plt.xticks(rotation=45) 

  plt.tight_layout() 

  plt.savefig(trend_png) 

  plt.clf() 

else: 

  plt.figure(figsize=(8,2)) 

  plt.text(0.5,0.5,"No historical data for trend lines",ha="center",va="center") 

  plt.axis("off") 

  plt.savefig(trend_png) 

  plt.clf() 

 

# --- Dependency graph: build directed graph from module_deps.json --- 

dep_png = f"history/dependency-graph-{ts}.png" 

if os.path.exists("module_deps.json"): 

  with open("module_deps.json") as f: 

    deps = json.load(f) 

  G = nx.DiGraph() 

  # add edges app -> module 

  for app, mods in deps.items(): 

    G.add_node(app, type='app') 

    for m in mods: 

      G.add_node(m, type='module') 

      G.add_edge(app, m) 

  if len(G.nodes) == 0: 

    plt.figure(figsize=(6,2)) 

    plt.text(0.5,0.5,"No dependency data",ha="center",va="center") 

    plt.axis("off") 

    plt.savefig(dep_png) 

    plt.clf() 

  else: 

    plt.figure(figsize=(12,8)) 

    pos = nx.spring_layout(G, k=0.5, iterations=50) 

    node_colors = ['#1f78b4' if G.nodes[n].get('type')=='app' else '#33a02c' for n in G.nodes()] 

    nx.draw_networkx_nodes(G, pos, node_size=600, node_color=node_colors) 

    nx.draw_networkx_edges(G, pos, arrows=True, arrowstyle='->', arrowsize=12, edge_color='#888888') 

    nx.draw_networkx_labels(G, pos, font_size=8) 

    plt.title("Module dependency graph (apps -> local modules)") 

    plt.axis('off') 

    plt.tight_layout() 

    plt.savefig(dep_png) 

    plt.clf() 

else: 

  plt.figure(figsize=(6,2)) 

  plt.text(0.5,0.5,"No dependency data",ha="center",va="center") 

  plt.axis("off") 

  plt.savefig(dep_png) 

  plt.clf() 

 

# --- Turnaround chart (existing) --- 

ta_png = f"history/turnaround-by-issue-{ts}.png" 

if os.path.exists("turnaround.csv"): 

  ta = pd.DataFrame(columns=["issue", "turnaround_days"]) 

  try: 

      ta = pd.read_csv("turnaround.csv") 

  except: 

      pass 

  ta = ta.dropna(subset=["turnaround_days"]) 

  if not ta.empty: 

    ta_sorted = ta.sort_values("turnaround_days", ascending=False).head(50) 

    plt.figure(figsize=(12,6)) 

    plt.bar(ta_sorted["issue"].astype(str), ta_sorted["turnaround_days"]) 

    plt.xticks(rotation=90) 

    plt.title("Turnaround time (days) for closed issues in window") 

    plt.xlabel("Issue number") 

    plt.ylabel("Turnaround (days)") 

    plt.tight_layout() 

    plt.savefig(ta_png) 

    plt.clf() 

  else: 

    plt.figure(figsize=(8,2)) 

    plt.text(0.5,0.5,"No turnaround data available",ha="center",va="center") 

    plt.axis("off") 

    plt.savefig(ta_png) 

    plt.clf() 

else: 

  plt.figure(figsize=(8,2)) 

  plt.text(0.5,0.5,"No turnaround data available",ha="center",va="center") 

  plt.axis("off") 

  plt.savefig(ta_png) 

  plt.clf() 

 

# --- Issue activity charts (opened vs closed) --- 

activity_png = f"history/issue-activity-{ts}.png" 

 

if os.path.exists("issue_activity.csv"): 

    act = pd.read_csv("issue_activity.csv") 

 

    plt.figure(figsize=(6,4)) 

    plt.bar(act["metric"], act["count"], color=["#1f78b4", "#33a02c"]) 

    plt.title("GitHub issue activity in last window") 

    plt.xlabel("Issue state") 

    plt.ylabel("Count") 

    plt.tight_layout() 

    plt.savefig(activity_png) 

    plt.clf() 

else: 

    plt.figure(figsize=(6,2)) 

    plt.text(0.5, 0.5, "No issue activity data", ha="center", va="center") 

    plt.axis("off") 

    plt.savefig(activity_png) 

    plt.clf() 

 

# --- AI summary (who wants what) --- 

if os.path.exists("issues.json"): 

  with open("issues.json") as f: 

    issues = json.load(f) 

else: 

  issues = [] 

condensed = [] 

for i in issues: 

  condensed.append({ 

    "number": i.get("number"), 

    "user": i.get("user"), 

    "title": i.get("title"), 

    "html_url": i.get("html_url") 

  }) 

with open("issues_for_ai.json","w") as f: 

  json.dump(condensed, f, indent=2) 

 

# call OpenAI if key present (same approach as before) 

import subprocess, os 

OPENAI_KEY = os.environ.get("OPENAI_API_KEY") 

ai_text = "AI summary skipped (no OPENAI_API_KEY)." 

if OPENAI_KEY: 

  prompt = ("You are given a JSON array of GitHub issues with fields: number, user, title, html_url. " 

            "Produce a concise list of one-line 'who wants what' statements, one per issue, in plain text. " 

            "Format: '#<number> — <user> wants <succinct request derived from title>'. " 

            "Do not add commentary.") 

  payload = { 

    "model": "gpt-4o-mini", 

    "messages": [{"role":"system","content":"You are a concise summarizer."}, 

                 {"role":"user","content": prompt + "\\n\\nJSON:\\n" + json.dumps(condensed)[:15000]}], 

    "temperature":0.2, 

    "max_tokens":400 

  } 

  proc = subprocess.run([ 

    "curl","-sS","https://api.openai.com/v1/chat/completions", 

    "-H", "Content-Type: application/json", 

    "-H", f"Authorization: Bearer {OPENAI_KEY}", 

    "-d", json.dumps(payload) 

  ], capture_output=True, text=True) 

  if proc.returncode == 0 and proc.stdout: 

    try: 

      resp = json.loads(proc.stdout) 

      ai_text = resp["choices"][0]["message"]["content"].strip() 

    except Exception: 

      ai_text = "AI summary unavailable (parsing error)." 

 

# --- Write markdown report combining all visuals --- 

md_path = f"history/severity-report-{ts}.md" 

with open(md_path, "w") as f: 

  f.write("# Weekly Terraform module hotspot report\n\n") 

  f.write(f"**Window (days):** {os.environ.get('WINDOW_DAYS','7')}\n\n") 

  f.write("## AI Summary (who wants what)\n\n") 

  f.write("```\n") 

  f.write(ai_text + "\n") 

  f.write("```\n\n") 

  f.write("## GitHub issue activity (last window)\n\n") 

  f.write(f"![{os.path.basename(activity_png)}]" 

          f"({os.path.basename(activity_png)})\n\n") 

 

  if os.path.exists("issue_activity.csv"): 

      act = pd.read_csv("issue_activity.csv") 

      f.write(act.to_markdown(index=False) + "\n\n") 

  f.write("## Top module terms by issue frequency\n\n") 

  if not counts.empty: 

    f.write("![" + os.path.basename(png_sev) + "](" + os.path.basename(png_sev) + ")\n\n") 

    f.write(counts.head(30).to_frame("issues").to_markdown() + "\n\n") 

  else: 

    f.write("No module-impacting issues found in the selected window.\n\n") 

  f.write("## Heatmap: module terms vs issues\n\n") 

  f.write("![" + os.path.basename(heat_png) + "](" + os.path.basename(heat_png) + ")\n\n") 

  f.write("## Trend lines: historical issue frequency for top module terms\n\n") 

  f.write("![" + os.path.basename(trend_png) + "](" + os.path.basename(trend_png) + ")\n\n") 

  f.write("## Dependency graph: apps -> local modules\n\n") 

  f.write("![" + os.path.basename(dep_png) + "](" + os.path.basename(dep_png) + ")\n\n") 

  f.write("## Turnaround time for closed issues (days)\n\n") 

  f.write("![" + os.path.basename(ta_png) + "](" + os.path.basename(ta_png) + ")\n\n") 

  f.write("## Data artifacts\n\n") 

  f.write("- `severity_data.csv` — per-issue module term mapping\n") 

  f.write("- `turnaround.csv` — per-issue turnaround in days\n") 

  f.write("- `issue_to_module_terms.json` — mapping used to build charts\n") 

  f.write("- `module_deps.json` — module dependency data used for graph\n") 

 

# Save current CSVs into history with timestamp for future trend aggregation 

try: 

  import shutil 

  if os.path.exists("severity_data.csv"): 

    shutil.copy("severity_data.csv", f"history/severity-data-{ts}.csv") 

  if os.path.exists("turnaround.csv"): 

    shutil.copy("turnaround.csv", f"history/turnaround-{ts}.csv") 

except Exception: 

  pass 

 

print(f"REPORT_MD={md_path}") 

print(f"REPORT_PNG={png_sev}") 

print(f"REPORT_HEAT={heat_png}") 

print(f"REPORT_TREND={trend_png}") 

print(f"REPORT_DEP={dep_png}") 

print(f"REPORT_TA={ta_png}") 

 

import os, re 

from pathlib import Path 

 

hist = Path("history") 

hist.mkdir(exist_ok=True) 

 

# Pair md+png by timestamp in filename: severity-by-module-YYYYMMDD-HHMMSS.(md|png) 

pat = re.compile(r"^severity-by-module-(\d{8}-\d{6})\.(md|png)$") 

 

groups = {} 

for p in hist.iterdir(): 

  m = pat.match(p.name) 

  if not m: 

    continue 

  ts = m.group(1) 

  groups.setdefault(ts, []).append(p) 

 

# Keep newest 10 timestamps 

timestamps = sorted(groups.keys(), reverse=True) 

keep = set(timestamps[:10]) 

drop = [p for ts, files in groups.items() if ts not in keep for p in files] 

 

for p in drop: 

  p.unlink() 

 

print(f"Kept {len(keep)} report sets; pruned {len(drop)} files.") 

 

--- 

This produces sample output including the various json and csv files as mentioned above. We list just one of them: 

                  metric       count 

0               #opened   8 

1               #closed     8 
Care must be taken to not run into rate limits: For example: 

{“message”: “API rate limit exceeded for <client-ip-address>”, documentation_url”: https://docs.github.com/rest/overview/resources-in-the-rest-api#rate-limiting}  

No comments:

Post a Comment