Thursday, April 2, 2026

 The following is a sample code for getting custom insights into GitHub issues opened against a repository on a periodic basis: 

#! /usr/bin/python 

import os, requests, json, datetime, re 

REPO = os.environ["REPO"] 

TOKEN = os.environ["GH_TOKEN"] 

WINDOW_DAYS = int(os.environ.get("WINDOW_DAYS","7")) 

HEADERS = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json", "X-GitHub-Api-Version": "2026-03-10"} 

since = (datetime.datetime.utcnow() - datetime.timedelta(days=WINDOW_DAYS)).isoformat() + "Z" 

 

# ---- Helpers ---- 

def gh_get(url, params=None,ignore_status_codes=None): 

  r = requests.get(url, headers=HEADERS, params=params) 

  if ignore_status_codes is not None: 

       if isinstance(ignore_status_codes, int): 

             ignore_status_codes = {ignore_status_codes} 

       else: 

             ignore_status_codes = set(ignore_status_codes) 

       if r.status_code in ignore_status_codes: 

             return None 

  r.raise_for_status() 

  return r.json() 

 

def gh_get_text(url): 

  r = requests.get(url, headers=HEADERS) 

  r.raise_for_status() 

  return r.text 

 

issues_url = f"https://api.github.com/repos/{REPO}/issues" 

params = {"state":"closed","since":since,"per_page":100} 

items = gh_get(issues_url, params=params) 

 

issues = [] 

for i in items: 

  if "pull_request" in i: 

    continue 

  comments = gh_get(i["comments_url"], params={"per_page":100}) 

  pr_urls = set() 

  for c in comments: 

    body = c.get("body","") or "" 

    for m in re.findall(r"https://github\.com/[^/\s]+/[^/\s]+/pull/\d+", body): 

      pr_urls.add(m) 

    for m in re.findall(r"(?:^|\s)#(\d+)\b", body): 

      pr_urls.add(f"https://github.com/{REPO}/pull/{m}") 

  issues.append({ 

    "number": i["number"], 

    "title": i.get("title",""), 

    "user": i.get("user",{}).get("login",""), 

    "created_at": i.get("created_at"), 

    "closed_at": i.get("closed_at"), 

    "html_url": i.get("html_url"), 

    "comments": [{"id":c.get("id"), "body":c.get("body",""), "created_at":c.get("created_at")} for c in comments], 

    "pr_urls": sorted(pr_urls) 

  }) 

 

with open("issues.json","w") as f: 

  json.dump(issues, f, indent=2) 

print(f"WROTE_ISSUES={len(issues)}") 

 

import os, requests, datetime, pandas as pd 

 

REPO = os.environ["REPO"] 

TOKEN = os.environ["GH_TOKEN"] 

WINDOW_DAYS = int(os.environ.get("WINDOW_DAYS", "7")) 

 

headers = { 

  "Authorization": f"Bearer {TOKEN}", 

  "Accept": "application/vnd.github+json", 

} 

 

since = (datetime.datetime.utcnow() - datetime.timedelta(days=WINDOW_DAYS)).isoformat() + "Z" 

url = f"https://api.github.com/repos/{REPO}/issues" 

 

def fetch(state): 

  items = [] 

  page = 1 

  while True: 

    r = requests.get( 

      url, 

      headers=headers, 

      params={"state": state, "since": since, "per_page": 100, "page": page}, 

    ) 

    r.raise_for_status() 

    batch = [i for i in r.json() if "pull_request" not in i] 

    if not batch: 

      break 

    items.extend(batch) 

    if len(batch) < 100: 

      break 

    page += 1 

  return items 

 

opened = fetch("open") 

closed = fetch("closed") 

 

df = pd.DataFrame( 

  [ 

    {"metric": "opened", "count": len(opened)}, 

    {"metric": "closed", "count": len(closed)}, 

  ] 

) 

 

df.to_csv("issue_activity.csv", index=False) 

print(df) 

 

import os, re, json, datetime, requests 

import hcl2 

import pandas as pd 

 

REPO = os.environ["GITHUB_REPOSITORY"] 

GH_TOKEN = os.environ["GH_TOKEN"] 

HEADERS = {"Authorization": f"Bearer {GH_TOKEN}", "Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json", "X-GitHub-Api-Version": "2026-03-10"} 

 

# ---- Time window (last 7 days) ---- 

since = (datetime.datetime.utcnow() - datetime.timedelta(days=7)).isoformat() + "Z" 

 

# ---- Helpers ---- 

def list_closed_issues(): 

  # Issues API returns both issues and PRs; filter out PRs. 

  url = f"https://api.github.com/repos/{REPO}/issues" 

  items = gh_get(url, params={"state":"closed","since":since,"per_page":100}) 

  return [i for i in items if "pull_request" not in i] 

 

PR_HTML_URL_RE = re.compile( 

    r"https?://github\.com/(?P<owner>[^/\s]+)/(?P<repo>[^/\s]+)/pull/(?P<num>\d+)", 

    re.IGNORECASE, 

) 

PR_API_URL_RE = re.compile( 

    r"https?://api\.github\.com/repos/(?P<owner>[^/\s]+)/(?P<repo>[^/\s]+)/pulls/(?P<num>\d+)", 

    re.IGNORECASE, 

) 

 

# Shorthand references that might appear in text: 

#   - #123  (assumed to be same repo) 

#   - owner/repo#123 (explicit cross-repo) 

SHORTHAND_SAME_REPO_RE = re.compile(r"(?<!\w)#(?P<num>\d+)\b") 

SHORTHAND_CROSS_REPO_RE = re.compile( 

    r"(?P<owner>[A-Za-z0-9_.-]+)/(?P<repo>[A-Za-z0-9_.-]+)#(?P<num>\d+)\b" 

) 

 

def _normalize_html_pr_url(owner: str, repo: str, num: int) -> str: 

    return f"https://github.com/{owner}/{repo}/pull/{int(num)}" 

 

def _collect_from_text(text: str, default_owner: str, default_repo: str) -> set: 

    """Extract candidate PR URLs from free text (body/comments/events text).""" 

    found = set() 

    if not text: 

        return found 

  

    # 1) Direct HTML PR URLs 

    for m in PR_HTML_URL_RE.finditer(text): 

        found.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num"))) 

 

    # 2) API PR URLs -> convert to HTML 

    for m in PR_API_URL_RE.finditer(text): 

        found.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num"))) 

 

    # 3) Cross-repo shorthand: owner/repo#123 (we will treat it as PR URL candidate) 

    for m in SHORTHAND_CROSS_REPO_RE.finditer(text): 

        found.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num"))) 

 

    # 4) Same-repo shorthand: #123 

    for m in SHORTHAND_SAME_REPO_RE.finditer(text): 

        found.add(_normalize_html_pr_url(default_ownerdefault_repom.group("num"))) 

 

    return found 

 

def _paginate_gh_get(url, headers=None, per_page=100): 

    """Generator: fetch all pages until fewer than per_page are returned.""" 

    page = 1 

    while True: 

        data = gh_get(url, params={"per_page": per_page, "page": page}) 

        if not isinstance(data, list) or len(data) == 0: 

            break 

        for item in data: 

            yield item 

        if len(data) < per_page: 

            break 

        page += 1 

 

def extract_pr_urls_from_issue(issue_number: int): 

    """ 

    Extract PR URLs associated with an issue by scanning: 

      - Issue body 

      - Issue comments 

      - Issue events (including 'mentioned', 'cross-referenced', etc.) 

      - Issue timeline (most reliable for cross references) 

 

    Returns a sorted list of unique, normalized HTML PR URLs. 

    Requires: 

      - REPO = "owner/repo" 

      - gh_get(url, params=None, headers=None) is available 

    """ 

    owner, repo = REPO.split("/", 1) 

    pr_urls = set() 

 

    # Baseline Accept header for REST v3 + timeline support. 

    # The timeline historically required a preview header. Keep both for compatibility. 

    base_headers = { 

        "Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json" 

    } 

 

    # 1) Issue body 

    issue_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}" 

    issue = gh_get(issue_url) 

    if isinstance(issue, dict): 

        body = issue.get("body") or "" 

        pr_urls |= _collect_from_text(body, owner, repo) 

 

        # If this issue IS itself a PR (when called with a PR number), make sure we don't add itself erroneously 

        # We won't add unless text contains it anyway; still fine. 

 

    # 2) All comments 

    comments_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}/comments" 

    for c in _paginate_gh_get(comments_url): 

        body = c.get("body") or "" 

        pr_urls |= _collect_from_text(body, owner, repo) 

 

    # 3) Issue events (event stream can have 'mentioned', 'cross-referenced', etc.) 

    events_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}/events" 

    for ev in _paginate_gh_get(events_url): 

        # (a) Free-text fields: some events carry body/commit messages, etc. 

        if isinstance(evdict): 

            body = ev.get("body") or "" 

            pr_urls |= _collect_from_text(body, owner, repo) 

 

            # (b) Structured cross-reference (best: 'cross-referenced' events) 

            #     If the source.issue has 'pull_request' key, it's a PR; use its html_url. 

            if ev.get("event") == "cross-referenced": 

                src = ev.get("source") or {} 

                issue_obj = src.get("issue") or {} 

                pr_obj = issue_obj.get("pull_request") or {} 

                html_url = issue_obj.get("html_url") 

                if pr_obj and html_url and "/pull/" in html_url: 

                    pr_urls.add(html_url) 

                # Fallback: If not marked but looks like a PR in URL 

                elif html_url and "/pull/" in html_url: 

                    pr_urls.add(html_url) 

 

        # (c) Also include 'mentioned' events (broadened): inspect whatever text fields exist 

        # Already covered via 'body' text extraction 

 

    # 4) Timeline API (the most complete for references) 

    timeline_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}/timeline" 

    for item in _paginate_gh_get(timeline_url): 

        if not isinstance(item, dict): 

            continue 

 

        # Free-text scan on any plausible string field 

        for key in ("body", "message", "title", "commit_message", "subject"): 

            val = item.get(key) 

            if isinstance(val, str): 

                pr_urls |= _collect_from_text(val, owner, repo) 

 

        # Structured cross-reference payloads 

        if item.get("event") == "cross-referenced": 

            src = item.get("source") or {} 

            issue_obj = src.get("issue") or {} 

            pr_obj = issue_obj.get("pull_request") or {} 

            html_url = issue_obj.get("html_url") 

            if pr_obj and html_url and "/pull/" in html_url: 

                pr_urls.add(html_url) 

            elif html_url and "/pull/" in html_url: 

                pr_urls.add(html_url) 

 

        # Some timeline items are themselves issues/PRs with html_url 

        html_url = item.get("html_url") 

        if isinstance(html_url, str) and "/pull/" in html_url: 

            pr_urls.add(html_url) 

 

        # Occasionally the timeline includes API-style URLs 

        api_url = item.get("url") 

        if isinstance(api_url, str): 

            m = PR_API_URL_RE.search(api_url) 

            if m: 

                pr_urls.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num"))) 

 

    # Final normalization: keep only HTML PR URLs and sort 

    pr_urls = {m.group(0) for url in pr_urls for m in [PR_HTML_URL_RE.search(url)] if m} 

    return sorted(pr_urls) 

 

def pr_number_from_url(u): 

  m = re.search(r"/pull/(\d+)", u) 

  return int(m.group(1)) if m else None 

 

def list_pr_files(pr_number): 

  url = f"https://api.github.com/repos/{REPO}/pulls/{pr_number}/files" 

  files = [] 

  page = 1 

  while True: 

    batch = gh_get(url, params={"per_page":100,"page":page}ignore_status_codes=404) 

    if not batch: 

      break 

    files.extend(batch) 

    page += 1 

  return files 

 

def get_pr_head_sha(pr_number): 

  url = f"https://api.github.com/repos/{REPO}/pulls/{pr_number}" 

  pr = gh_get(urlignore_status_codes=404) 

  return pr["head"]["sha"] 

 

def get_file_at_sha(path, sha): 

  # Use contents API to fetch file at a specific ref (sha). 

  url = f"https://api.github.com/repos/{REPO}/contents/{path}" 

  r = requests.get(url, headers=HEADERS, params={"ref": sha}) 

  if r.status_code == 404: 

    return None 

  r.raise_for_status() 

  data = r.json() 

  if isinstance(data, dict) and data.get("type") == "file" and data.get("download_url"): 

    return gh_get_text(data["download_url"]) 

  return None 

 

def extract_module_term_from_source(src: str) -> str | None: 

    """ 

    Given a module 'source' string, return the last path segment between the 

    final '/' and the '?' (or end of string if '?' is absent). 

    Examples: 

      git::https://...//modules/container/kubernetes-service?ref=v4.0.15 -> 'kubernetes-service' 

      ../modules/network/vnet -> 'vnet' 

      registry- or other sources with no '/' -> returns None 

    """ 

    if not isinstance(src, str) or not src: 

        return None 

    # Strip query string 

    path = src.split('?', 1)[0] 

    # For git:: URLs that include a double-slash path component ("//modules/..."), 

    # keep the right-most path component regardless of scheme. 

    # Normalize backslashes just in case. 

    path = path.replace('\\', '/') 

    # Remove trailing slashes 

    path = path.rstrip('/') 

    # Split and take last non-empty part 

    parts = [p for p in path.split('/') if p] 

    if not parts: 

        return None 

    return parts[-1] 

 

def parse_module_terms_from_tf(tf_text): 

    """ 

    Parse HCL to find module blocks and return the set of module 'terms' 

    extracted from their 'source' attribute (last segment before '?'). 

    """ 

    terms = set() 

    try: 

        obj = hcl2.loads(tf_text) 

    except Exception: 

        return terms 

 

    mods = obj.get("module", []) 

    # module is usually list of dicts[{ "name": { "source": "...", ... }}, ...] 

    def add_src_term(src_str: str): 

        term = extract_module_term_from_source(src_str) 

        if term: 

            terms.add(term) 

 

    if isinstance(mods, list): 

        for item in mods: 

            if isinstance(item, dict): 

                for _, body in item.items(): 

                    if isinstance(body, dict): 

                        src = body.get("source") 

                        if isinstance(src, str): 

                            add_src_term(src) 

    elif isinstance(mods, dict): 

        for _, body in mods.items(): 

            if isinstance(body, dict): 

                src = body.get("source") 

                if isinstance(src, str): 

                    add_src_term(src) 

    return terms 

 

def parse_module_sources_from_tf(tf_text): 

  # Extract module "x" { source = "..." } blocks. 

  sources = set() 

  try: 

    obj = hcl2.loads(tf_text) 

  except Exception: 

    return sources 

 

  mods = obj.get("module", []) 

  # module is usually list of dicts[{ "name": { "source": "...", ... }}, ...] 

  if isinstance(mods, list): 

    for item in mods: 

      if isinstance(item, dict): 

        for _, body in item.items(): 

          if isinstance(body, dict): 

            src = body.get("source") 

            if isinstance(src, str): 

              sources.add(src) 

  elif isinstance(mods, dict): 

    for _, body in mods.items(): 

      if isinstance(body, dict): 

        src = body.get("source") 

        if isinstance(src, str): 

          sources.add(src) 

  return sources 

 

def normalize_local_module_path(source, app_dir): 

  # Only resolve local paths within repo; ignore registry/git/http sources. 

  if source.startswith("./") or source.startswith("../"): 

    # app_dir is like "workload/appA" 

    import posixpath 

    return posixpath.normpath(posixpath.join(app_dir, source)) 

  return None 

 

def list_repo_tf_files_under(dir_path, sha): 

  # Best-effort: use git (checked out main) for listing; then fetch content at sha. 

  # We only need paths; use `git ls-tree` against sha for accuracy. 

  import subprocess 

  try: 

    out = subprocess.check_output(["git","ls-tree","-r","--name-only",sha,dir_path], text=True) 

    paths = [p.strip() for p in out.splitlines() if p.strip().endswith(".tf")] 

    return paths 

  except Exception: 

    return [] 

 

def collect_module_terms_for_app(app_dir, sha): 

    """ 

    Scan all .tf in the app dir at PR head sha; extract: 

      1) module terms directly used by the app 

      2) for any local module sources, recurse one level and extract module terms defined there 

    """ 

    terms = set() 

    module_dirs = set() 

 

    tf_paths = list_repo_tf_files_under(app_dir, sha) 

    for p in tf_paths: 

        txt = get_file_at_sha(p, sha) 

        if not txt: 

            continue 

        # Collect module terms directly in the app 

        terms |= parse_module_terms_from_tf(txt) 

        # Track local modules so we can scan their contents 

        for src in parse_module_sources_from_tf(txt): 

            local = normalize_local_module_path(srcapp_dir) 

            if local: 

                module_dirs.add(local) 

 

    # Scan local module dirs for additional module terms (one level deep) 

    for mdir in sorted(module_dirs): 

        m_tf_paths = list_repo_tf_files_under(mdir, sha) 

        for p in m_tf_paths: 

            txt = get_file_at_sha(p, sha) 

            if not txt: 

                continue 

            terms |= parse_module_terms_from_tf(txt) 

 

    return terms 

 

# ---- Main: issues -> PRs -> touched apps -> module terms ---- 

issues = list_closed_issues() 

 

issue_to_terms = {}  # issue_number -> set(module_terms) 

for issue in issues: 

  inum = issue["number"] 

  pr_urls = extract_pr_urls_from_issue(inum) 

  pr_numbers = sorted({pr_number_from_url(u) for u in pr_urls if pr_number_from_url(u)}) 

 

  if not pr_numbers: 

    continue 

 

  terms_for_issue = set() 

 

  for prn in pr_numbers: 

    sha = get_pr_head_sha(prn) 

    files = list_pr_files(prn) 

    if not sha or not files: 

        continue 

    # Identify which workload apps are touched by this PR. 

    # Requirement: multiple application folders within "workload/". 

    touched_apps = set() 

    for f in files: 

      path = f.get("filename","") 

      if not path.startswith("workload/"): 

        continue 

      parts = path.split("/") 

      if len(parts) >= 2: 

        touched_apps.add("/".join(parts[:2]))  # workload/<app> 

 

    # For each touched app, compute module terms by scanning app + local modules. 

    for app_dir in sorted(touched_apps): 

      terms_for_issue |= collect_module_terms_for_app(app_dir, sha) 

 

  if terms_for_issue: 

    issue_to_terms[inum] = sorted(terms_for_issue) 

 

# Build severity distribution: "severity" = number of issues touching each module term. 

rows = [] 

for inum, terms in issue_to_terms.items(): 

  for t in set(terms): 

    rows.append({"issue": inum, "module_term": t}) 

print(f"rows={len(rows)}") 

 

df = pd.DataFrame(rows) 

df.to_csv("severity_data.csv", index=False) 

 

# Also write a compact JSON for debugging/audit. 

with open("issue_to_module_terms.json","w") as f: 

  json.dump(issue_to_terms, f, indent=2, sort_keys=True) 

 

print(f"Closed issues considered: {len(issues)}") 

print(f"Issues with PR-linked module impact: {len(issue_to_terms)}") 

 

import osjson, re, requests, subprocess 

import hcl2 

REPO = os.environ["REPO"] 

TOKEN = os.environ["GH_TOKEN"] 

HEADERS = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json", "X-GitHub-Api-Version": "2026-03-10"} 

 

with open("issues.json") as f: 

  issues = json.load(f) 

 

issue_to_terms = {} 

issue_turnaround = {} 

module_deps = {}  # app_dir -> set(module paths it references) 

 

for issue in issues: 

  inum = issue["number"] 

  created = issue.get("created_at") 

  closed = issue.get("closed_at") 

  if created and closed: 

    from datetime import datetime 

    fmt = "%Y-%m-%dT%H:%M:%SZ" 

    try: 

      dt_created = datetime.strptime(created, fmt) 

      dt_closed = datetime.strptime(closed, fmt) 

      delta_days = (dt_closed - dt_created).total_seconds() / 86400.0 

    except Exception: 

      delta_days = None 

  else: 

    delta_days = None 

  issue_turnaround[inum] = delta_days 

 

  pr_urls = issue.get("pr_urls",[]) 

  pr_numbers = sorted({pr_number_from_url(u) for u in pr_urls if pr_number_from_url(u)}) 

  terms_for_issue = set() 

  for prn in pr_numbers: 

    sha = get_pr_head_sha(prn) 

    files = list_pr_files(prn) 

    touched_apps = set() 

    for f in files: 

      path = f.get("filename","") 

      if path.startswith("workload/"): 

        parts = path.split("/") 

        if len(parts) >= 2: 

          touched_apps.add("/".join(parts[:2])) 

    for app_dir in sorted(touched_apps): 

      terms_for_issue |= collect_module_terms_for_app(app_dir, sha) 

      # collect module sources for dependency graph 

      # scan app tf files for module sources at PR head 

      tf_paths = list_repo_tf_files_under(app_dir, sha) 

      for p in tf_paths: 

        txt = get_file_at_sha(p, sha) 

        if not txt: 

          continue 

        for src in parse_module_sources_from_tf(txt): 

          local = normalize_local_module_path(srcapp_dir) 

          if local: 

            module_deps.setdefault(app_dir, set()).add(local) 

  if terms_for_issue: 

    issue_to_terms[inum] = sorted(terms_for_issue) 

 

rows = [] 

for inum, terms in issue_to_terms.items(): 

  for t in set(terms): 

    rows.append({"issue": inum, "module_term": t}) 

import pandas as pd 

df = pd.DataFrame(rows) 

df.to_csv("severity_data.csv", index=False) 

 

ta_rows = [] 

for inum, days in issue_turnaround.items(): 

  ta_rows.append({"issue": inum, "turnaround_days": days}) 

pd.DataFrame(ta_rows).to_csv("turnaround.csv", index=False) 

 

with open("issue_to_module_terms.json","w") as f: 

  json.dump(issue_to_terms, f, indent=2) 

with open("issue_turnaround.json","w") as f: 

  json.dump(issue_turnaround, f, indent=2) 

with open("module_deps.json","w") as f: 

  json.dump({k: sorted(list(v)) for k,v in module_deps.items()}, f, indent=2) 

 

print(f"ISSUES_WITH_TYPES={len(issue_to_terms)}") 

 

import osjson, datetime, glob 

import pandas as pd 

import matplotlib.pyplot as plt 

import seaborn as sns 

import networkx as nx 

 

ts = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S") 

os.makedirs("history", exist_ok=True) 

 

# --- Severity bar (existing) --- 

if os.path.exists("severity_data.csv"): 

  df = pd.DataFrame(columns=["issue", "module_term"]) 

  try: 

      df = pd.read_csv("severity_data.csv") 

  except: 

     pass 

  counts = df.groupby("module_term")["issue"].nunique().sort_values(ascending=False) 

else: 

  counts = pd.Series(dtype=int) 

 

png_sev = f"history/severity-by-module-{ts}.png" 

plt.figure(figsize=(12,6)) 

if not counts.empty: 

  counts.plot(kind="bar") 

  plt.title("Issue frequency by module term") 

  plt.xlabel("module_term") 

  plt.ylabel("number of closed issues touching module term") 

else: 

  plt.text(0.5, 0.5, "No module-impacting issues in window", ha="center", va="center") 

  plt.axis("off") 

plt.tight_layout() 

plt.savefig(png_sev) 

plt.clf() 

 

# --- Heatmap: module_term x issue (binary or counts) --- 

heat_png = f"history/heatmap-module-issues-{ts}.png" 

 

if os.path.exists("severity_data.csv"): 

  mat = pd.DataFrame(columns=["issue", "module_term"]) 

  try: 

      mat = pd.read_csv("severity_data.csv") 

  except: 

     pass   

  if not mat.empty: 

    pivot = mat.pivot_table(index="module_term", columns="issue", aggfunc='size', fill_value=0) 

    # Optionally cluster or sort by total counts 

    pivot['total'] = pivot.sum(axis=1) 

    pivot = pivot.sort_values('total', ascending=False).drop(columns=['total']) 

    # limit columns for readability (most recent/top issues) 

    if pivot.shape[1] > 100: 

      pivot = pivot.iloc[:, :100] 

    plt.figure(figsize=(14, max(6, 0.2 * pivot.shape[0]))) 

    sns.heatmap(pivot, cmap="YlOrRd", cbar=True) 

    plt.title("Heatmap: module terms (rows) vs issues (columns)") 

    plt.xlabel("Issue number (truncated)") 

    plt.ylabel("module terms") 

    plt.tight_layout() 

    plt.savefig(heat_png) 

    plt.clf() 

  else: 

    plt.figure(figsize=(6,2)) 

    plt.text(0.5,0.5,"No data for heatmap",ha="center",va="center") 

    plt.axis("off") 

    plt.savefig(heat_png) 

    plt.clf() 

else: 

  plt.figure(figsize=(6,2)) 

  plt.text(0.5,0.5,"No data for heatmap",ha="center",va="center") 

  plt.axis("off") 

  plt.savefig(heat_png) 

  plt.clf() 

 

# --- Trend lines: aggregate historical severity_data.csv files in history/ --- 

trend_png = f"history/trendlines-module-{ts}.png" 

collect historical CSVs that match severity_data pattern 

hist_files = sorted(glob.glob("history/*severity-data-*.csv") + glob.glob("history/*severity_data.csv") + glob.glob("history/*severity-by-module-*.csv")) 

also include current run's severity_data.csv 

if os.path.exists("severity_data.csv"): 

  hist_files.append("severity_data.csv") 

# Build weekly counts per module terms by deriving timestamp from filenames where possible 

trend_df = pd.DataFrame() 

for f in hist_files: 

  try: 

    # attempt to extract timestamp from filename 

    import re 

    m = re.search(r"(\d{8}-\d{6})", f) 

    ts_label = m.group(1) if m else os.path.getmtime(f) 

    from datetime import datetime 

    ts_label = str(datetime.utcfromtimestamp(ts_label).strftime(“%Y%m%d-%H%M%S”)) 

    tmp = pd.DataFrame(columns=["issue", "module_term"]) 

    try: 

        tmp = pd.read_csv(f) 

    except: 

       pass 

    if tmp.empty: 

        continue   

    counts_tmp = tmp.groupby("module_term")["issue"].nunique().rename(ts_label) 

    trend_df = pd.concat([trend_dfcounts_tmp], axis=1) 

  except Exception: 

    continue 

if not trend_df.empty: 

  trend_df = trend_df.fillna(0).T 

  # convert index to datetime where possible 

  plt.figure(figsize=(14,6)) 

  # plot top N module_terms by latest total 

  latest = trend_df.iloc[-1].sort_values(ascending=False).head(8).index.tolist() 

  for col in latest: 

    plt.plot(trend_df.indextrend_df[col], marker='o', label=col) 

  plt.legend(loc='best', fontsize='small') 

  plt.title("Trend lines: issue frequency over time for top module_terms") 

  plt.xlabel("time") 

  plt.ylabel("issue count") 

  plt.xticks(rotation=45) 

  plt.tight_layout() 

  plt.savefig(trend_png) 

  plt.clf() 

else: 

  plt.figure(figsize=(8,2)) 

  plt.text(0.5,0.5,"No historical data for trend lines",ha="center",va="center") 

  plt.axis("off") 

  plt.savefig(trend_png) 

  plt.clf() 

 

# --- Dependency graph: build directed graph from module_deps.json --- 

dep_png = f"history/dependency-graph-{ts}.png" 

if os.path.exists("module_deps.json"): 

  with open("module_deps.json") as f: 

    deps = json.load(f) 

  G = nx.DiGraph() 

  # add edges app -> module 

  for app, mods in deps.items(): 

    G.add_node(app, type='app') 

    for m in mods: 

      G.add_node(m, type='module') 

      G.add_edge(app, m) 

  if len(G.nodes) == 0: 

    plt.figure(figsize=(6,2)) 

    plt.text(0.5,0.5,"No dependency data",ha="center",va="center") 

    plt.axis("off") 

    plt.savefig(dep_png) 

    plt.clf() 

  else: 

    plt.figure(figsize=(12,8)) 

    pos = nx.spring_layout(G, k=0.5, iterations=50) 

    node_colors = ['#1f78b4' if G.nodes[n].get('type')=='app' else '#33a02c' for n in G.nodes()] 

    nx.draw_networkx_nodes(G, pos, node_size=600, node_color=node_colors) 

    nx.draw_networkx_edges(G, pos, arrows=True, arrowstyle='->', arrowsize=12, edge_color='#888888') 

    nx.draw_networkx_labels(G, pos, font_size=8) 

    plt.title("Module dependency graph (apps -> local modules)") 

    plt.axis('off') 

    plt.tight_layout() 

    plt.savefig(dep_png) 

    plt.clf() 

else: 

  plt.figure(figsize=(6,2)) 

  plt.text(0.5,0.5,"No dependency data",ha="center",va="center") 

  plt.axis("off") 

  plt.savefig(dep_png) 

  plt.clf() 

 

# --- Turnaround chart (existing) --- 

ta_png = f"history/turnaround-by-issue-{ts}.png" 

if os.path.exists("turnaround.csv"): 

  ta = pd.DataFrame(columns=["issue", "turnaround_days"]) 

  try: 

      ta = pd.read_csv("turnaround.csv") 

  except: 

      pass 

  ta = ta.dropna(subset=["turnaround_days"]) 

  if not ta.empty: 

    ta_sorted = ta.sort_values("turnaround_days", ascending=False).head(50) 

    plt.figure(figsize=(12,6)) 

    plt.bar(ta_sorted["issue"].astype(str), ta_sorted["turnaround_days"]) 

    plt.xticks(rotation=90) 

    plt.title("Turnaround time (days) for closed issues in window") 

    plt.xlabel("Issue number") 

    plt.ylabel("Turnaround (days)") 

    plt.tight_layout() 

    plt.savefig(ta_png) 

    plt.clf() 

  else: 

    plt.figure(figsize=(8,2)) 

    plt.text(0.5,0.5,"No turnaround data available",ha="center",va="center") 

    plt.axis("off") 

    plt.savefig(ta_png) 

    plt.clf() 

else: 

  plt.figure(figsize=(8,2)) 

  plt.text(0.5,0.5,"No turnaround data available",ha="center",va="center") 

  plt.axis("off") 

  plt.savefig(ta_png) 

  plt.clf() 

 

# --- Issue activity charts (opened vs closed) --- 

activity_png = f"history/issue-activity-{ts}.png" 

 

if os.path.exists("issue_activity.csv"): 

    act = pd.read_csv("issue_activity.csv") 

 

    plt.figure(figsize=(6,4)) 

    plt.bar(act["metric"], act["count"], color=["#1f78b4", "#33a02c"]) 

    plt.title("GitHub issue activity in last window") 

    plt.xlabel("Issue state") 

    plt.ylabel("Count") 

    plt.tight_layout() 

    plt.savefig(activity_png) 

    plt.clf() 

else: 

    plt.figure(figsize=(6,2)) 

    plt.text(0.5, 0.5, "No issue activity data", ha="center", va="center") 

    plt.axis("off") 

    plt.savefig(activity_png) 

    plt.clf() 

 

# --- AI summary (who wants what) --- 

if os.path.exists("issues.json"): 

  with open("issues.json") as f: 

    issues = json.load(f) 

else: 

  issues = [] 

condensed = [] 

for i in issues: 

  condensed.append({ 

    "number": i.get("number"), 

    "user": i.get("user"), 

    "title": i.get("title"), 

    "html_url": i.get("html_url") 

  }) 

with open("issues_for_ai.json","w") as f: 

  json.dump(condensed, f, indent=2) 

 

# call OpenAI if key present (same approach as before) 

import subprocess, os 

OPENAI_KEY = os.environ.get("OPENAI_API_KEY") 

ai_text = "AI summary skipped (no OPENAI_API_KEY)." 

if OPENAI_KEY: 

  prompt = ("You are given a JSON array of GitHub issues with fields: number, user, title, html_url. " 

            "Produce a concise list of one-line 'who wants what' statements, one per issue, in plain text. " 

            "Format: '#<number> — <user> wants <succinct request derived from title>'. " 

            "Do not add commentary.") 

  payload = { 

    "model": "gpt-4o-mini", 

    "messages": [{"role":"system","content":"You are a concise summarizer."}, 

                 {"role":"user","content": prompt + "\\n\\nJSON:\\n" + json.dumps(condensed)[:15000]}], 

    "temperature":0.2, 

    "max_tokens":400 

  } 

  proc = subprocess.run([ 

    "curl","-sS","https://api.openai.com/v1/chat/completions", 

    "-H", "Content-Type: application/json", 

    "-H", f"Authorization: Bearer {OPENAI_KEY}", 

    "-d", json.dumps(payload) 

  ], capture_output=True, text=True) 

  if proc.returncode == 0 and proc.stdout: 

    try: 

      resp = json.loads(proc.stdout) 

      ai_text = resp["choices"][0]["message"]["content"].strip() 

    except Exception: 

      ai_text = "AI summary unavailable (parsing error)." 

 

# --- Write markdown report combining all visuals --- 

md_path = f"history/severity-report-{ts}.md" 

with open(md_path, "w") as f: 

  f.write("# Weekly Terraform module hotspot report\n\n") 

  f.write(f"**Window (days):** {os.environ.get('WINDOW_DAYS','7')}\n\n") 

  f.write("## AI Summary (who wants what)\n\n") 

  f.write("```\n") 

  f.write(ai_text + "\n") 

  f.write("```\n\n") 

  f.write("## GitHub issue activity (last window)\n\n") 

  f.write(f"![{os.path.basename(activity_png)}]" 

          f"({os.path.basename(activity_png)})\n\n") 

 

  if os.path.exists("issue_activity.csv"): 

      act = pd.read_csv("issue_activity.csv") 

      f.write(act.to_markdown(index=False) + "\n\n") 

  f.write("## Top module terms by issue frequency\n\n") 

  if not counts.empty: 

    f.write("![" + os.path.basename(png_sev) + "](" + os.path.basename(png_sev) + ")\n\n") 

    f.write(counts.head(30).to_frame("issues").to_markdown() + "\n\n") 

  else: 

    f.write("No module-impacting issues found in the selected window.\n\n") 

  f.write("## Heatmap: module terms vs issues\n\n") 

  f.write("![" + os.path.basename(heat_png) + "](" + os.path.basename(heat_png) + ")\n\n") 

  f.write("## Trend lines: historical issue frequency for top module terms\n\n") 

  f.write("![" + os.path.basename(trend_png) + "](" + os.path.basename(trend_png) + ")\n\n") 

  f.write("## Dependency graph: apps -> local modules\n\n") 

  f.write("![" + os.path.basename(dep_png) + "](" + os.path.basename(dep_png) + ")\n\n") 

  f.write("## Turnaround time for closed issues (days)\n\n") 

  f.write("![" + os.path.basename(ta_png) + "](" + os.path.basename(ta_png) + ")\n\n") 

  f.write("## Data artifacts\n\n") 

  f.write("- `severity_data.csv` — per-issue module term mapping\n") 

  f.write("- `turnaround.csv` — per-issue turnaround in days\n") 

  f.write("- `issue_to_module_terms.json` — mapping used to build charts\n") 

  f.write("- `module_deps.json` — module dependency data used for graph\n") 

 

# Save current CSVs into history with timestamp for future trend aggregation 

try: 

  import shutil 

  if os.path.exists("severity_data.csv"): 

    shutil.copy("severity_data.csv", f"history/severity-data-{ts}.csv") 

  if os.path.exists("turnaround.csv"): 

    shutil.copy("turnaround.csv", f"history/turnaround-{ts}.csv") 

except Exception: 

  pass 

 

print(f"REPORT_MD={md_path}") 

print(f"REPORT_PNG={png_sev}") 

print(f"REPORT_HEAT={heat_png}") 

print(f"REPORT_TREND={trend_png}") 

print(f"REPORT_DEP={dep_png}") 

print(f"REPORT_TA={ta_png}") 

 

import os, re 

from pathlib import Path 

 

hist = Path("history") 

hist.mkdir(exist_ok=True) 

 

# Pair md+png by timestamp in filename: severity-by-module-YYYYMMDD-HHMMSS.(md|png) 

pat = re.compile(r"^severity-by-module-(\d{8}-\d{6})\.(md|png)$") 

 

groups = {} 

for p in hist.iterdir(): 

  m = pat.match(p.name) 

  if not m: 

    continue 

  ts = m.group(1) 

  groups.setdefault(ts, []).append(p) 

 

# Keep newest 10 timestamps 

timestamps = sorted(groups.keys(), reverse=True) 

keep = set(timestamps[:10]) 

drop = [p for ts, files in groups.items() if ts not in keep for p in files] 

 

for p in drop: 

  p.unlink() 

 

print(f"Kept {len(keep)} report sets; pruned {len(drop)} files.") 

 

--- 

This produces sample output including the various json and csv files as mentioned above. We list just one of them: 

                  metric       count 

0               #opened   8 

1               #closed     8 
Care must be taken to not run into rate limits: For example: 

{“message”: “API rate limit exceeded for <client-ip-address>”, documentation_url”: https://docs.github.com/rest/overview/resources-in-the-rest-api#rate-limiting}  

Wednesday, April 1, 2026

 This is a summary of the book titled “Between You and AI: Unlock the Power of Human Skills to Thrive in an AI-Driven World” written by Andrea Iorio and published by Wiley, 2025. This book argues that the most durable advantage in an AI-saturated workplace comes from combining machine efficiency with distinctly human judgment.

Iorio frames AI as a powerful accelerator of structured work—searching, summarizing, classifying, drafting, and pattern-finding—but he cautions that automation alone rarely differentiates a person or an organization. He points to forecasts that a substantial share of work may be automated and notes that the winners will be those who use AI to amplify what machines do not supply on their own: meaning, context, relationships, ethical reflection, and creative reframing.

A “hybrid” skill set: delegating well-bounded tasks to AI while strengthening emotional intelligence, critical thinking, and creativity, is a must. “The way forward is not about choosing between AI and human expertise — it is about integrating both into a new hybrid set of skills that leverages the best of each.” To illustrate, Iorio revisits the moment IBM’s Deep Blue defeated chess champion Garry Kasparov and the subsequent rise of “advanced chess,” where human players use AI analysis but still shape strategy and decide when to depart from the model’s suggestions.

He extends that example to everyday business decisions. At Nubank, for instance, customer service representatives work with an AI co-pilot that offers real-time suggestions. The system improves speed and consistency, while the human agent contributes empathy and situational awareness—qualities that matter when someone is frustrated, confused, or dealing with a sensitive issue.

Because AI can surface information that once required years of specialized study, Iorio argues that advantage increasingly comes from knowing how to work with these systems, not from memorizing what they can retrieve. He cites research from his team suggesting many leaders would rather collaborate with someone who can use AI well to find and synthesize answers than with someone who relies on expertise alone. In that sense, prompting becomes a practical craft: “The more thought you put into your prompt from the start, the more time and productivity you will save later.”

When Iorio turns to prompt design, his guidance is straightforward: be deliberate about the role you want the system to play, the specificity of the question, the context that shapes what “good” looks like, and the format you need back. Instead of asking for a generic report, you might ask the model to respond as a consultant, define the industry and constraints, describe the audience, and request an output structure that you can review and refine.

From there, the book emphasizes what Iorio calls “data sensemaking.” AI can process huge volumes of information, detect patterns, and generate predictions, but it cannot decide what matters most in a particular environment. Sensemaking means choosing the questions worth asking, defining indicators that connect to real decisions, and interpreting outputs in light of goals, constraints, and lived experience. It also includes actively looking for surprising relationships in the data, distinguishing vanity metrics from signals that should change priorities, and connecting past performance to leading indicators that hint at where the market is moving.

In 1997, IBM’s Deep Blue beat then-undefeated Chess champion Garry Kasparov. In the aftermath of his loss, Kasparov began playing Advanced Chess, in which human players collaborate with an AI. Players consider AI’s advice and intervene with their own strategies.

Historically, people gained a competitive advantage by acquiring highly specialized knowledge. For example, lawyers charge high fees because they dedicate years to becoming experts in the law. But nowadays, AIs such as GPT-4 can pass bar exams and explain legal matters, such as data privacy policies, to laypeople. This doesn’t mean human lawyers — or other human experts — are going away. However, according to a survey by Andrea Iorio and his team, nearly 60% of leaders would prefer to collaborate with people skilled in using AI to find answers than with people with strong expertise but who don’t use AI.

Sensemaking also requires skepticism about where outputs come from and how they generalize. Iorio notes that models can overreach when information is thin or when training data reflects historical bias. The remedy he recommends is continuous review: checking whether data is current, whether it represents the populations affected by the decision, documenting known limitations, and building human review into workflows—especially where the stakes are high.

Another theme is “reperception,” Iorio’s term for deliberately letting go of inherited assumptions to make room for new possibilities. He describes common cognitive traps—such as seeking only confirming evidence, getting overwhelmed by abundant information, defaulting to familiar “safe” strategies, and mistaking slow early progress for a sign that change will never accelerate. In practice, reperception can look like intentionally exposing yourself to viewpoints outside your usual feed, using frameworks to narrow attention to what is truly decision-relevant, and regularly posing questions that challenge what you take for granted.

To show how a single “impossible” question can reopen a problem, Iorio retells the story of Edwin Land being asked by his young daughter why she could not see a photo immediately—a moment that helped spur the invention of instant photography. He pairs that mindset shift with adaptability: noticing emerging curves early and acting on what you learn. John Deere, for example, moved beyond selling equipment toward using sensors and AI to provide farmers with guidance on planting and yield, expanding into software and services rather than relying only on its historical product line.

Iorio then draws on the concept of “antifragility”: not merely withstanding shocks, but improving because of them. Citing research on decades of innovative projects, he argues that failure is a common feature of eventual success when teams extract lessons quickly and apply them to the next iteration. AI, in his view, can lower the cost of learning by helping prevent routine errors through automation, manage unavoidable risks through prediction and monitoring, and accelerate experimentation by analyzing patterns across large sets of past failures.

He highlights how simulation and pattern analysis can compress feedback loops. Automotive firms that once relied on a limited number of expensive physical crash tests can now run many virtual scenarios, learn faster, and refine designs earlier. In a different domain, NotCo’s AI system, “Giuseppe,” searches through vast ingredient combinations to propose plant-based recipes that human teams can then test and adjust, turning unusual suggestions into practical prototypes.

Plant-based food developer NotCo developed a proprietary AI, “Giuseppe,” that analyzes the texture, structure, and flavor properties of 300,000 potential ingredients and suggests recipes for vegan products. Even though some of “Giuseppe’s” ideas seem unusual — like using pineapple and cauliflower as part of plant-based milk — the AI allowed NotCo to generate and test new products, such as a vegan custard for Shake Shack, in far less time than traditional approaches required.

A later section focuses on trust. Iorio notes that people are often wary of AI in sensitive settings such as healthcare, even when the technology can improve detection and treatment. He describes research in multiple sclerosis care in which systems can scan records and imaging for patterns clinicians might miss, and he argues that the value of such tools depends on making their use understandable and accountable to the people affected.

A 2023 study by the Pew Research Center found that 60% of Americans are concerned about their medical providers using AI. AI can significantly improve the diagnosis and treatment of diseases, particularly for complex conditions that don’t have a definitive test, such as, for example, multiple sclerosis (MS). People with MS may experience a variety of symptoms, including blurry vision and difficulty walking. They often visit different specialists for each problem. Research published in a 2023 issue of the International Journal of MS Care found that AI can seek patterns across health records and identify signs of MS that individual doctors might overlook. Researchers at University College London used the AI MindGlide to detect patterns in MS patients’ MRI scans and — in a matter of seconds — recommend treatment plans that are most likely to be effective.

He returns repeatedly to the “black box” problem: when a model produces an output that neither users nor even developers can readily explain, organizations may not be able to justify decisions or detect errors. For regulated decisions—such as credit and lending—he points to the importance of transparency and “explainable AI,” meaning systems and processes that allow humans to trace the logic, challenge results, and correct them when necessary.

Finally, Iorio argues that responsibility cannot be delegated to a tool. Using the 2018 fatal crash involving an Uber self-driving vehicle as an example, he shows how accountability tends to fall back on humans and organizations even when automated systems are involved. “AI can execute, but it cannot care… it cannot be held responsible.” For that reason, he recommends defining who owns AI-assisted decisions, building checkpoints for human review, and testing outputs against organizational values so that efficiency does not override fairness, safety, or long-term trust.

Andrea Iorio hosts the Metanoia Lab podcast and NVIDIA’s Vem AI podcast in Brazil. He is an MBA professor at Fundação Dom Cabral, a columnist for MIT Technology Review Brazil, and a frequent speaker on AI and leadership.