The following is a sample code for getting custom insights into GitHub issues opened against a repository on a periodic basis:
#! /usr/bin/python
import os, requests, json, datetime, re
REPO = os.environ["REPO"]
TOKEN = os.environ["GH_TOKEN"]
WINDOW_DAYS = int(os.environ.get("WINDOW_DAYS","7"))
HEADERS = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json", "X-GitHub-Api-Version": "2026-03-10"}
since = (datetime.datetime.utcnow() - datetime.timedelta(days=WINDOW_DAYS)).isoformat() + "Z"
# ---- Helpers ----
def gh_get(url, params=None):
r = requests.get(url, headers=HEADERS, params=params)
r.raise_for_status()
return r.json()
def gh_get_text(url):
r = requests.get(url, headers=HEADERS)
r.raise_for_status()
return r.text
issues_url = f"https://api.github.com/repos/{REPO}/issues"
params = {"state":"closed","since":since,"per_page":100}
items = gh_get(issues_url, params=params)
issues = []
for i in items:
if "pull_request" in i:
continue
comments = gh_get(i["comments_url"], params={"per_page":100})
pr_urls = set()
for c in comments:
body = c.get("body","") or ""
for m in re.findall(r"https://github\.com/[^/\s]+/[^/\s]+/pull/\d+", body):
pr_urls.add(m)
for m in re.findall(r"(?:^|\s)#(\d+)\b", body):
pr_urls.add(f"https://github.com/{REPO}/pull/{m}")
issues.append({
"number": i["number"],
"title": i.get("title",""),
"user": i.get("user",{}).get("login",""),
"created_at": i.get("created_at"),
"closed_at": i.get("closed_at"),
"html_url": i.get("html_url"),
"comments": [{"id":c.get("id"), "body":c.get("body",""), "created_at":c.get("created_at")} for c in comments],
"pr_urls": sorted(pr_urls)
})
with open("issues.json","w") as f:
json.dump(issues, f, indent=2)
print(f"WROTE_ISSUES={len(issues)}")
import os, requests, datetime, pandas as pd
REPO = os.environ["REPO"]
TOKEN = os.environ["GH_TOKEN"]
WINDOW_DAYS = int(os.environ.get("WINDOW_DAYS", "7"))
headers = {
"Authorization": f"Bearer {TOKEN}",
"Accept": "application/vnd.github+json",
}
since = (datetime.datetime.utcnow() - datetime.timedelta(days=WINDOW_DAYS)).isoformat() + "Z"
url = f"https://api.github.com/repos/{REPO}/issues"
def fetch(state):
items = []
page = 1
while True:
r = requests.get(
url,
headers=headers,
params={"state": state, "since": since, "per_page": 100, "page": page},
)
r.raise_for_status()
batch = [i for i in r.json() if "pull_request" not in i]
if not batch:
break
items.extend(batch)
if len(batch) < 100:
break
page += 1
return items
opened = fetch("open")
closed = fetch("closed")
df = pd.DataFrame(
[
{"metric": "opened", "count": len(opened)},
{"metric": "closed", "count": len(closed)},
]
)
df.to_csv("issue_activity.csv", index=False)
print(df)
import os, re, json, datetime, requests
import hcl2
import pandas as pd
REPO = os.environ["GITHUB_REPOSITORY"]
GH_TOKEN = os.environ["GH_TOKEN"]
HEADERS = {"Authorization": f"Bearer {GH_TOKEN}", "Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json", "X-GitHub-Api-Version": "2026-03-10"}
# ---- Time window (last 7 days) ----
since = (datetime.datetime.utcnow() - datetime.timedelta(days=7)).isoformat() + "Z"
# ---- Helpers ----
def list_closed_issues():
# Issues API returns both issues and PRs; filter out PRs.
url = f"https://api.github.com/repos/{REPO}/issues"
items = gh_get(url, params={"state":"closed","since":since,"per_page":100})
return [i for i in items if "pull_request" not in i]
PR_HTML_URL_RE = re.compile(
r"https?://github\.com/(?P<owner>[^/\s]+)/(?P<repo>[^/\s]+)/pull/(?P<num>\d+)",
re.IGNORECASE,
)
PR_API_URL_RE = re.compile(
r"https?://api\.github\.com/repos/(?P<owner>[^/\s]+)/(?P<repo>[^/\s]+)/pulls/(?P<num>\d+)",
re.IGNORECASE,
)
# Shorthand references that might appear in text:
# - #123 (assumed to be same repo)
# - owner/repo#123 (explicit cross-repo)
SHORTHAND_SAME_REPO_RE = re.compile(r"(?<!\w)#(?P<num>\d+)\b")
SHORTHAND_CROSS_REPO_RE = re.compile(
r"(?P<owner>[A-Za-z0-9_.-]+)/(?P<repo>[A-Za-z0-9_.-]+)#(?P<num>\d+)\b"
)
def _normalize_html_pr_url(owner: str, repo: str, num: int) -> str:
return f"https://github.com/{owner}/{repo}/pull/{int(num)}"
def _collect_from_text(text: str, default_owner: str, default_repo: str) -> set:
"""Extract candidate PR URLs from free text (body/comments/events text)."""
found = set()
if not text:
return found
# 1) Direct HTML PR URLs
for m in PR_HTML_URL_RE.finditer(text):
found.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num")))
# 2) API PR URLs -> convert to HTML
for m in PR_API_URL_RE.finditer(text):
found.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num")))
# 3) Cross-repo shorthand: owner/repo#123 (we will treat it as PR URL candidate)
for m in SHORTHAND_CROSS_REPO_RE.finditer(text):
found.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num")))
# 4) Same-repo shorthand: #123
for m in SHORTHAND_SAME_REPO_RE.finditer(text):
found.add(_normalize_html_pr_url(default_owner, default_repo, m.group("num")))
return found
def _paginate_gh_get(url, headers=None, per_page=100):
"""Generator: fetch all pages until fewer than per_page are returned."""
page = 1
while True:
data = gh_get(url, params={"per_page": per_page, "page": page})
if not isinstance(data, list) or len(data) == 0:
break
for item in data:
yield item
if len(data) < per_page:
break
page += 1
def extract_pr_urls_from_issue(issue_number: int):
"""
Extract PR URLs associated with an issue by scanning:
- Issue body
- Issue comments
- Issue events (including 'mentioned', 'cross-referenced', etc.)
- Issue timeline (most reliable for cross references)
Returns a sorted list of unique, normalized HTML PR URLs.
Requires:
- REPO = "owner/repo"
- gh_get(url, params=None, headers=None) is available
"""
owner, repo = REPO.split("/", 1)
pr_urls = set()
# Baseline Accept header for REST v3 + timeline support.
# The timeline historically required a preview header. Keep both for compatibility.
base_headers = {
"Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json"
}
# 1) Issue body
issue_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}"
issue = gh_get(issue_url)
if isinstance(issue, dict):
body = issue.get("body") or ""
pr_urls |= _collect_from_text(body, owner, repo)
# If this issue IS itself a PR (when called with a PR number), make sure we don't add itself erroneously
# We won't add unless text contains it anyway; still fine.
# 2) All comments
comments_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}/comments"
for c in _paginate_gh_get(comments_url):
body = c.get("body") or ""
pr_urls |= _collect_from_text(body, owner, repo)
# 3) Issue events (event stream can have 'mentioned', 'cross-referenced', etc.)
events_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}/events"
for ev in _paginate_gh_get(events_url):
# (a) Free-text fields: some events carry body/commit messages, etc.
if isinstance(ev, dict):
body = ev.get("body") or ""
pr_urls |= _collect_from_text(body, owner, repo)
# (b) Structured cross-reference (best: 'cross-referenced' events)
# If the source.issue has 'pull_request' key, it's a PR; use its html_url.
if ev.get("event") == "cross-referenced":
src = ev.get("source") or {}
issue_obj = src.get("issue") or {}
pr_obj = issue_obj.get("pull_request") or {}
html_url = issue_obj.get("html_url")
if pr_obj and html_url and "/pull/" in html_url:
pr_urls.add(html_url)
# Fallback: If not marked but looks like a PR in URL
elif html_url and "/pull/" in html_url:
pr_urls.add(html_url)
# (c) Also include 'mentioned' events (broadened): inspect whatever text fields exist
# Already covered via 'body' text extraction
# 4) Timeline API (the most complete for references)
timeline_url = f"https://api.github.com/repos/{REPO}/issues/{issue_number}/timeline"
for item in _paginate_gh_get(timeline_url):
if not isinstance(item, dict):
continue
# Free-text scan on any plausible string field
for key in ("body", "message", "title", "commit_message", "subject"):
val = item.get(key)
if isinstance(val, str):
pr_urls |= _collect_from_text(val, owner, repo)
# Structured cross-reference payloads
if item.get("event") == "cross-referenced":
src = item.get("source") or {}
issue_obj = src.get("issue") or {}
pr_obj = issue_obj.get("pull_request") or {}
html_url = issue_obj.get("html_url")
if pr_obj and html_url and "/pull/" in html_url:
pr_urls.add(html_url)
elif html_url and "/pull/" in html_url:
pr_urls.add(html_url)
# Some timeline items are themselves issues/PRs with html_url
html_url = item.get("html_url")
if isinstance(html_url, str) and "/pull/" in html_url:
pr_urls.add(html_url)
# Occasionally the timeline includes API-style URLs
api_url = item.get("url")
if isinstance(api_url, str):
m = PR_API_URL_RE.search(api_url)
if m:
pr_urls.add(_normalize_html_pr_url(m.group("owner"), m.group("repo"), m.group("num")))
# Final normalization: keep only HTML PR URLs and sort
pr_urls = {m.group(0) for url in pr_urls for m in [PR_HTML_URL_RE.search(url)] if m}
return sorted(pr_urls)
def pr_number_from_url(u):
m = re.search(r"/pull/(\d+)", u)
return int(m.group(1)) if m else None
def list_pr_files(pr_number):
url = f"https://api.github.com/repos/{REPO}/pulls/{pr_number}/files"
files = []
page = 1
while True:
batch = gh_get(url, params={"per_page":100,"page":page})
if not batch:
break
files.extend(batch)
page += 1
return files
def get_pr_head_sha(pr_number):
url = f"https://api.github.com/repos/{REPO}/pulls/{pr_number}"
pr = gh_get(url)
return pr["head"]["sha"]
def get_file_at_sha(path, sha):
# Use contents API to fetch file at a specific ref (sha).
url = f"https://api.github.com/repos/{REPO}/contents/{path}"
r = requests.get(url, headers=HEADERS, params={"ref": sha})
if r.status_code == 404:
return None
r.raise_for_status()
data = r.json()
if isinstance(data, dict) and data.get("type") == "file" and data.get("download_url"):
return gh_get_text(data["download_url"])
return None
def extract_module_term_from_source(src: str) -> str | None:
"""
Given a module 'source' string, return the last path segment between the
final '/' and the '?' (or end of string if '?' is absent).
Examples:
git::https://...//modules/container/kubernetes-service?ref=v4.0.15 -> 'kubernetes-service'
../modules/network/vnet -> 'vnet'
registry- or other sources with no '/' -> returns None
"""
if not isinstance(src, str) or not src:
return None
# Strip query string
path = src.split('?', 1)[0]
# For git:: URLs that include a double-slash path component ("//modules/..."),
# keep the right-most path component regardless of scheme.
# Normalize backslashes just in case.
path = path.replace('\\', '/')
# Remove trailing slashes
path = path.rstrip('/')
# Split and take last non-empty part
parts = [p for p in path.split('/') if p]
if not parts:
return None
return parts[-1]
def parse_module_terms_from_tf(tf_text):
"""
Parse HCL to find module blocks and return the set of module 'terms'
extracted from their 'source' attribute (last segment before '?').
"""
terms = set()
try:
obj = hcl2.loads(tf_text)
except Exception:
return terms
mods = obj.get("module", [])
# module is usually list of dicts: [{ "name": { "source": "...", ... }}, ...]
def add_src_term(src_str: str):
term = extract_module_term_from_source(src_str)
if term:
terms.add(term)
if isinstance(mods, list):
for item in mods:
if isinstance(item, dict):
for _, body in item.items():
if isinstance(body, dict):
src = body.get("source")
if isinstance(src, str):
add_src_term(src)
elif isinstance(mods, dict):
for _, body in mods.items():
if isinstance(body, dict):
src = body.get("source")
if isinstance(src, str):
add_src_term(src)
return terms
def parse_module_sources_from_tf(tf_text):
# Extract module "x" { source = "..." } blocks.
sources = set()
try:
obj = hcl2.loads(tf_text)
except Exception:
return sources
mods = obj.get("module", [])
# module is usually list of dicts: [{ "name": { "source": "...", ... }}, ...]
if isinstance(mods, list):
for item in mods:
if isinstance(item, dict):
for _, body in item.items():
if isinstance(body, dict):
src = body.get("source")
if isinstance(src, str):
sources.add(src)
elif isinstance(mods, dict):
for _, body in mods.items():
if isinstance(body, dict):
src = body.get("source")
if isinstance(src, str):
sources.add(src)
return sources
def normalize_local_module_path(source, app_dir):
# Only resolve local paths within repo; ignore registry/git/http sources.
if source.startswith("./") or source.startswith("../"):
# app_dir is like "workload/appA"
import posixpath
return posixpath.normpath(posixpath.join(app_dir, source))
return None
def list_repo_tf_files_under(dir_path, sha):
# Best-effort: use git (checked out main) for listing; then fetch content at sha.
# We only need paths; use `git ls-tree` against sha for accuracy.
import subprocess
try:
out = subprocess.check_output(["git","ls-tree","-r","--name-only",sha,dir_path], text=True)
paths = [p.strip() for p in out.splitlines() if p.strip().endswith(".tf")]
return paths
except Exception:
return []
def collect_module_terms_for_app(app_dir, sha):
"""
Scan all .tf in the app dir at PR head sha; extract:
1) module terms directly used by the app
2) for any local module sources, recurse one level and extract module terms defined there
"""
terms = set()
module_dirs = set()
tf_paths = list_repo_tf_files_under(app_dir, sha)
for p in tf_paths:
txt = get_file_at_sha(p, sha)
if not txt:
continue
# Collect module terms directly in the app
terms |= parse_module_terms_from_tf(txt)
# Track local modules so we can scan their contents
for src in parse_module_sources_from_tf(txt):
local = normalize_local_module_path(src, app_dir)
if local:
module_dirs.add(local)
# Scan local module dirs for additional module terms (one level deep)
for mdir in sorted(module_dirs):
m_tf_paths = list_repo_tf_files_under(mdir, sha)
for p in m_tf_paths:
txt = get_file_at_sha(p, sha)
if not txt:
continue
terms |= parse_module_terms_from_tf(txt)
return terms
# ---- Main: issues -> PRs -> touched apps -> module terms ----
issues = list_closed_issues()
issue_to_terms = {} # issue_number -> set(module_terms)
for issue in issues:
inum = issue["number"]
pr_urls = extract_pr_urls_from_issue(inum)
pr_numbers = sorted({pr_number_from_url(u) for u in pr_urls if pr_number_from_url(u)})
if not pr_numbers:
continue
terms_for_issue = set()
for prn in pr_numbers:
sha = get_pr_head_sha(prn)
files = list_pr_files(prn)
# Identify which workload apps are touched by this PR.
# Requirement: multiple application folders within "workload/".
touched_apps = set()
for f in files:
path = f.get("filename","")
if not path.startswith("workload/"):
continue
parts = path.split("/")
if len(parts) >= 2:
touched_apps.add("/".join(parts[:2])) # workload/<app>
# For each touched app, compute module terms by scanning app + local modules.
for app_dir in sorted(touched_apps):
terms_for_issue |= collect_module_terms_for_app(app_dir, sha)
if terms_for_issue:
issue_to_terms[inum] = sorted(terms_for_issue)
# Build severity distribution: "severity" = number of issues touching each module term.
rows = []
for inum, terms in issue_to_terms.items():
for t in set(terms):
rows.append({"issue": inum, "module_term": t})
print(f"rows={len(rows)}")
df = pd.DataFrame(rows)
df.to_csv("severity_data.csv", index=False)
# Also write a compact JSON for debugging/audit.
with open("issue_to_module_terms.json","w") as f:
json.dump(issue_to_terms, f, indent=2, sort_keys=True)
print(f"Closed issues considered: {len(issues)}")
print(f"Issues with PR-linked module impact: {len(issue_to_terms)}")
import os, json, re, requests, subprocess
import hcl2
REPO = os.environ["REPO"]
TOKEN = os.environ["GH_TOKEN"]
HEADERS = {"Authorization": f"Bearer {TOKEN}", "Accept": "application/vnd.github+json, application/vnd.github.mockingbird-preview+json", "X-GitHub-Api-Version": "2026-03-10"}
with open("issues.json") as f:
issues = json.load(f)
issue_to_terms = {}
issue_turnaround = {}
module_deps = {} # app_dir -> set(module paths it references)
for issue in issues:
inum = issue["number"]
created = issue.get("created_at")
closed = issue.get("closed_at")
if created and closed:
from datetime import datetime
fmt = "%Y-%m-%dT%H:%M:%SZ"
try:
dt_created = datetime.strptime(created, fmt)
dt_closed = datetime.strptime(closed, fmt)
delta_days = (dt_closed - dt_created).total_seconds() / 86400.0
except Exception:
delta_days = None
else:
delta_days = None
issue_turnaround[inum] = delta_days
pr_urls = issue.get("pr_urls",[])
pr_numbers = sorted({pr_number_from_url(u) for u in pr_urls if pr_number_from_url(u)})
terms_for_issue = set()
for prn in pr_numbers:
sha = get_pr_head_sha(prn)
files = list_pr_files(prn)
touched_apps = set()
for f in files:
path = f.get("filename","")
if path.startswith("workload/"):
parts = path.split("/")
if len(parts) >= 2:
touched_apps.add("/".join(parts[:2]))
for app_dir in sorted(touched_apps):
terms_for_issue |= collect_module_terms_for_app(app_dir, sha)
# collect module sources for dependency graph
# scan app tf files for module sources at PR head
tf_paths = list_repo_tf_files_under(app_dir, sha)
for p in tf_paths:
txt = get_file_at_sha(p, sha)
if not txt:
continue
for src in parse_module_sources_from_tf(txt):
local = normalize_local_module_path(src, app_dir)
if local:
module_deps.setdefault(app_dir, set()).add(local)
if terms_for_issue:
issue_to_terms[inum] = sorted(terms_for_issue)
rows = []
for inum, terms in issue_to_terms.items():
for t in set(terms):
rows.append({"issue": inum, "module_term": t})
import pandas as pd
df = pd.DataFrame(rows)
df.to_csv("severity_data.csv", index=False)
ta_rows = []
for inum, days in issue_turnaround.items():
ta_rows.append({"issue": inum, "turnaround_days": days})
pd.DataFrame(ta_rows).to_csv("turnaround.csv", index=False)
with open("issue_to_module_terms.json","w") as f:
json.dump(issue_to_terms, f, indent=2)
with open("issue_turnaround.json","w") as f:
json.dump(issue_turnaround, f, indent=2)
with open("module_deps.json","w") as f:
json.dump({k: sorted(list(v)) for k,v in module_deps.items()}, f, indent=2)
print(f"ISSUES_WITH_TYPES={len(issue_to_terms)}")
import os, json, datetime, glob
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import networkx as nx
ts = datetime.datetime.utcnow().strftime("%Y%m%d-%H%M%S")
os.makedirs("history", exist_ok=True)
def read_csv(file_name):
df = None
if os.path.exists("severity_data.csv"):
try:
df = pd.read_csv("severity_data.csv")
except Exception:
df = None
else:
df = None
return df
# --- Severity bar (existing) ---
if os.path.exists("severity_data.csv"):
df = read_csv("severity_data.csv")
if df == None:
df = pd.DataFrame(columns=["issue", "module_term"])
counts = df.groupby("module_term")["issue"].nunique().sort_values(ascending=False)
else:
counts = pd.Series(dtype=int)
png_sev = f"history/severity-by-module-{ts}.png"
plt.figure(figsize=(12,6))
if not counts.empty:
counts.plot(kind="bar")
plt.title("Issue frequency by module term")
plt.xlabel("module_term")
plt.ylabel("number of closed issues touching module term")
else:
plt.text(0.5, 0.5, "No module-impacting issues in window", ha="center", va="center")
plt.axis("off")
plt.tight_layout()
plt.savefig(png_sev)
plt.clf()
# --- Heatmap: module_term x issue (binary or counts) ---
heat_png = f"history/heatmap-module-issues-{ts}.png"
if os.path.exists("severity_data.csv"):
mat = read_csv("severity_data.csv")
if not mat:
mat = pd.DataFrame(columns=["issue", "module_term"])
if not mat.empty:
pivot = mat.pivot_table(index="module_term", columns="issue", aggfunc='size', fill_value=0)
# Optionally cluster or sort by total counts
pivot['total'] = pivot.sum(axis=1)
pivot = pivot.sort_values('total', ascending=False).drop(columns=['total'])
# limit columns for readability (most recent/top issues)
if pivot.shape[1] > 100:
pivot = pivot.iloc[:, :100]
plt.figure(figsize=(14, max(6, 0.2 * pivot.shape[0])))
sns.heatmap(pivot, cmap="YlOrRd", cbar=True)
plt.title("Heatmap: module terms (rows) vs issues (columns)")
plt.xlabel("Issue number (truncated)")
plt.ylabel("module terms")
plt.tight_layout()
plt.savefig(heat_png)
plt.clf()
else:
plt.figure(figsize=(6,2))
plt.text(0.5,0.5,"No data for heatmap",ha="center",va="center")
plt.axis("off")
plt.savefig(heat_png)
plt.clf()
else:
plt.figure(figsize=(6,2))
plt.text(0.5,0.5,"No data for heatmap",ha="center",va="center")
plt.axis("off")
plt.savefig(heat_png)
plt.clf()
# --- Trend lines: aggregate historical severity_data.csv files in history/ ---
trend_png = f"history/trendlines-module-{ts}.png"
# collect historical CSVs that match severity_data pattern
hist_files = sorted(glob.glob("history/*severity-data-*.csv") + glob.glob("history/*severity_data.csv") + glob.glob("history/*severity-by-module-*.csv"))
# also include current run's severity_data.csv
if os.path.exists("severity_data.csv"):
hist_files.append("severity_data.csv")
# Build weekly counts per module terms by deriving timestamp from filenames where possible
trend_df = pd.DataFrame()
for f in hist_files:
try:
# attempt to extract timestamp from filename
import re
m = re.search(r"(\d{8}-\d{6})", f)
ts_label = m.group(1) if m else os.path.getmtime(f)
tmp = read_csv(f)
if tmp == None or tmp.empty:
continue
counts_tmp = tmp.groupby("module_terms")["issue"].nunique().rename(ts_label)
trend_df = pd.concat([trend_df, counts_tmp], axis=1)
except Exception:
continue
if not trend_df.empty:
trend_df = trend_df.fillna(0).T
# convert index to datetime where possible
try:
trend_df.index = pd.to_datetime(trend_df.index, format="%Y%m%d-%H%M%S", errors='coerce').fillna(pd.to_datetime(trend_df.index, unit='s'))
except Exception:
pass
plt.figure(figsize=(14,6))
# plot top N module_terms by latest total
latest = trend_df.iloc[-1].sort_values(ascending=False).head(8).index.tolist()
for col in latest:
plt.plot(trend_df.index, trend_df[col], marker='o', label=col)
plt.legend(loc='best', fontsize='small')
plt.title("Trend lines: issue frequency over time for top module_terms")
plt.xlabel("time")
plt.ylabel("issue count")
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(trend_png)
plt.clf()
else:
plt.figure(figsize=(8,2))
plt.text(0.5,0.5,"No historical data for trend lines",ha="center",va="center")
plt.axis("off")
plt.savefig(trend_png)
plt.clf()
# --- Dependency graph: build directed graph from module_deps.json ---
dep_png = f"history/dependency-graph-{ts}.png"
if os.path.exists("module_deps.json"):
with open("module_deps.json") as f:
deps = json.load(f)
G = nx.DiGraph()
# add edges app -> module
for app, mods in deps.items():
G.add_node(app, type='app')
for m in mods:
G.add_node(m, type='module')
G.add_edge(app, m)
if len(G.nodes) == 0:
plt.figure(figsize=(6,2))
plt.text(0.5,0.5,"No dependency data",ha="center",va="center")
plt.axis("off")
plt.savefig(dep_png)
plt.clf()
else:
plt.figure(figsize=(12,8))
pos = nx.spring_layout(G, k=0.5, iterations=50)
node_colors = ['#1f78b4' if G.nodes[n].get('type')=='app' else '#33a02c' for n in G.nodes()]
nx.draw_networkx_nodes(G, pos, node_size=600, node_color=node_colors)
nx.draw_networkx_edges(G, pos, arrows=True, arrowstyle='->', arrowsize=12, edge_color='#888888')
nx.draw_networkx_labels(G, pos, font_size=8)
plt.title("Module dependency graph (apps -> local modules)")
plt.axis('off')
plt.tight_layout()
plt.savefig(dep_png)
plt.clf()
else:
plt.figure(figsize=(6,2))
plt.text(0.5,0.5,"No dependency data",ha="center",va="center")
plt.axis("off")
plt.savefig(dep_png)
plt.clf()
# --- Turnaround chart (existing) ---
ta_png = f"history/turnaround-by-issue-{ts}.png"
if os.path.exists("turnaround.csv"):
ta = read_csv("turnaround.csv")
if ta == None:
ta = pd.DataFrame(columns=["issue", "turnaround_days"])
ta = ta.dropna(subset=["turnaround_days"])
if not ta.empty:
ta_sorted = ta.sort_values("turnaround_days", ascending=False).head(50)
plt.figure(figsize=(12,6))
plt.bar(ta_sorted["issue"].astype(str), ta_sorted["turnaround_days"])
plt.xticks(rotation=90)
plt.title("Turnaround time (days) for closed issues in window")
plt.xlabel("Issue number")
plt.ylabel("Turnaround (days)")
plt.tight_layout()
plt.savefig(ta_png)
plt.clf()
else:
plt.figure(figsize=(8,2))
plt.text(0.5,0.5,"No turnaround data available",ha="center",va="center")
plt.axis("off")
plt.savefig(ta_png)
plt.clf()
else:
plt.figure(figsize=(8,2))
plt.text(0.5,0.5,"No turnaround data available",ha="center",va="center")
plt.axis("off")
plt.savefig(ta_png)
plt.clf()
# --- Issue activity charts (opened vs closed) ---
activity_png = f"history/issue-activity-{ts}.png"
if os.path.exists("issue_activity.csv"):
act = pd.read_csv("issue_activity.csv")
plt.figure(figsize=(6,4))
plt.bar(act["metric"], act["count"], color=["#1f78b4", "#33a02c"])
plt.title("GitHub issue activity in last window")
plt.xlabel("Issue state")
plt.ylabel("Count")
plt.tight_layout()
plt.savefig(activity_png)
plt.clf()
else:
plt.figure(figsize=(6,2))
plt.text(0.5, 0.5, "No issue activity data", ha="center", va="center")
plt.axis("off")
plt.savefig(activity_png)
plt.clf()
# --- AI summary (who wants what) ---
if os.path.exists("issues.json"):
with open("issues.json") as f:
issues = json.load(f)
else:
issues = []
condensed = []
for i in issues:
condensed.append({
"number": i.get("number"),
"user": i.get("user"),
"title": i.get("title"),
"html_url": i.get("html_url")
})
with open("issues_for_ai.json","w") as f:
json.dump(condensed, f, indent=2)
# call OpenAI if key present (same approach as before)
import subprocess, os
OPENAI_KEY = os.environ.get("OPENAI_API_KEY")
ai_text = "AI summary skipped (no OPENAI_API_KEY)."
if OPENAI_KEY:
prompt = ("You are given a JSON array of GitHub issues with fields: number, user, title, html_url. "
"Produce a concise list of one-line 'who wants what' statements, one per issue, in plain text. "
"Format: '#<number> — <user> wants <succinct request derived from title>'. "
"Do not add commentary.")
payload = {
"model": "gpt-4o-mini",
"messages": [{"role":"system","content":"You are a concise summarizer."},
{"role":"user","content": prompt + "\\n\\nJSON:\\n" + json.dumps(condensed)[:15000]}],
"temperature":0.2,
"max_tokens":400
}
proc = subprocess.run([
"curl","-sS","https://api.openai.com/v1/chat/completions",
"-H", "Content-Type: application/json",
"-H", f"Authorization: Bearer {OPENAI_KEY}",
"-d", json.dumps(payload)
], capture_output=True, text=True)
if proc.returncode == 0 and proc.stdout:
try:
resp = json.loads(proc.stdout)
ai_text = resp["choices"][0]["message"]["content"].strip()
except Exception:
ai_text = "AI summary unavailable (parsing error)."
# --- Write markdown report combining all visuals ---
md_path = f"history/severity-report-{ts}.md"
with open(md_path, "w") as f:
f.write("# Weekly Terraform module hotspot report\n\n")
f.write(f"**Window (days):** {os.environ.get('WINDOW_DAYS','7')}\n\n")
f.write("## AI Summary (who wants what)\n\n")
f.write("```\n")
f.write(ai_text + "\n")
f.write("```\n\n")
f.write("## GitHub issue activity (last window)\n\n")
f.write(f"![{os.path.basename(activity_png)}]"
f"({os.path.basename(activity_png)})\n\n")
if os.path.exists("issue_activity.csv"):
act = pd.read_csv("issue_activity.csv")
f.write(act.to_markdown(index=False) + "\n\n")
f.write("## Top module terms by issue frequency\n\n")
if not counts.empty:
f.write(" + ")\n\n")
f.write(counts.head(30).to_frame("issues").to_markdown() + "\n\n")
else:
f.write("No module-impacting issues found in the selected window.\n\n")
f.write("## Heatmap: module terms vs issues\n\n")
f.write(" + ")\n\n")
f.write("## Trend lines: historical issue frequency for top module terms\n\n")
f.write(" + ")\n\n")
f.write("## Dependency graph: apps -> local modules\n\n")
f.write(" + ")\n\n")
f.write("## Turnaround time for closed issues (days)\n\n")
f.write(" + ")\n\n")
f.write("## Data artifacts\n\n")
f.write("- `severity_data.csv` — per-issue module term mapping\n")
f.write("- `turnaround.csv` — per-issue turnaround in days\n")
f.write("- `issue_to_module_terms.json` — mapping used to build charts\n")
f.write("- `module_deps.json` — module dependency data used for graph\n")
# Save current CSVs into history with timestamp for future trend aggregation
try:
import shutil
if os.path.exists("severity_data.csv"):
shutil.copy("severity_data.csv", f"history/severity-data-{ts}.csv")
if os.path.exists("turnaround.csv"):
shutil.copy("turnaround.csv", f"history/turnaround-{ts}.csv")
except Exception:
pass
print(f"REPORT_MD={md_path}")
print(f"REPORT_PNG={png_sev}")
print(f"REPORT_HEAT={heat_png}")
print(f"REPORT_TREND={trend_png}")
print(f"REPORT_DEP={dep_png}")
print(f"REPORT_TA={ta_png}")
import os, re
from pathlib import Path
hist = Path("history")
hist.mkdir(exist_ok=True)
# Pair md+png by timestamp in filename: severity-by-module-YYYYMMDD-HHMMSS.(md|png)
pat = re.compile(r"^severity-by-module-(\d{8}-\d{6})\.(md|png)$")
groups = {}
for p in hist.iterdir():
m = pat.match(p.name)
if not m:
continue
ts = m.group(1)
groups.setdefault(ts, []).append(p)
# Keep newest 10 timestamps
timestamps = sorted(groups.keys(), reverse=True)
keep = set(timestamps[:10])
drop = [p for ts, files in groups.items() if ts not in keep for p in files]
for p in drop:
p.unlink()
print(f"Kept {len(keep)} report sets; pruned {len(drop)} files.")
---
This produces sample output including the various json and csv files as mentioned above. We list just one of them:
metric count
0 #opened 8
1 #closed 8
Care must be taken to not run into rate limits: For example:
{“message”: “API rate limit exceeded for <client-ip-address>”, “documentation_url”: https://docs.github.com/rest/overview/resources-in-the-rest-api#rate-limiting}
No comments:
Post a Comment