Preparing COBOL Error Data for Machine Learning in SageMaker
Introduction COBOL and machine learning may sound like opposites—one was born in the era of punch cards, the other in the age of GPU clusters. But when COBOL jobs fail, they generate highly valuable data. In our eks_cobol architecture, every failed job writes an .error.json to S3. These structured error logs become the foundation for supervised learning: training a SageMaker model to predict which jobs will fail in the future. This article walks through how we preprocess those JSON error files, flatten them into a tabular dataset, and upload a clean CSV to S3 for SageMaker. You’ll get the full code, pipeline architecture, and design decisions—all in one shot. Problem: Structured Enough to Log, Too Messy for ML Our error JSONs are structured like this: { "jobId": "e8f3d9d4-1c9b-4c7b-b9e2-f2345a3a9c92", "timestamp": "2025-04-03T19:32:10Z", "status": "failed", "errorType": "DataFormatError", "message": "Invalid date format in field 6", "inputFile": "customers_202504.csv", "line": 42, "rawRecord": "A123,John,Doe,04/35/2024,ACTIVE" } This is great for debugging, but ML needs a table—rows and columns. So we extract a subset of fields (jobId, inputFile, errorType, message, rawRecord) and add a label: isFailure = True. The Preprocessing Service We built a service cronjob that scans all .json files in an error S3 bucket, flattens them, writes them to a CSV, and uploads that CSV to an ML-specific S3 bucket for SageMaker to consume. import os import json import csv import psycopg2 from datetime import datetime from glob import glob LOG_ROOT = "/logs/cobol" OUTPUT_PATH = "/logs/training/training-data.csv" PGHOST = os.getenv("POSTGRES_CONNECTION_ENDPOINT") PGDATABASE = os.getenv("POSTGRES_DATABASE_NAME") PGUSER = os.getenv("POSTGRES_USER") PGPASSWORD = os.getenv("POSTGRES_PASSWORD") def parse_row_string(row): parts = row.strip().split(",") if len(parts) != 4: return None try: date_str, category, tx_type, amount_str = parts description = f"{category} {tx_type}" dt = datetime.strptime(date_str, "%Y-%m-%d") amount = float(amount_str) is_zero = 1 if amount == 0.0 or amount == -0.0 else 0 is_negative_zero = 1 if amount_str.strip() == "-0.0" else 0 return { "year": dt.year, "month": dt.month, "day": dt.day, "day_of_week": dt.weekday(), "description": description, "amount": amount, "is_zero": is_zero, "is_negative_zero": is_negative_zero } except: return None def process_error_logs(): print("[*] Scanning error logs in /logs/cobol...") rows = [] files = glob(f"{LOG_ROOT}/**/*.json", recursive=True) print(f"[*] Found {len(files)} error log files.") for file in files: try: with open(file, "r") as f: data = json.load(f) row = data.get("row", "") parsed = parse_row_string(row) if parsed: parsed["label"] = 1 rows.append(parsed) except: continue print(f"[+] Parsed {len(rows)} error rows.") return rows def pull_good_rows(n): print(f"[*] Connecting to Postgres and pulling {n} good rows...") conn = psycopg2.connect( host=PGHOST, dbname=PGDATABASE, user=PGUSER, password=PGPASSWORD, connect_timeout=10 ) cursor = conn.cursor() cursor.execute(f"SELECT transaction_date, description, amount FROM transactions ORDER BY random() LIMIT {n};") rows = [] for date_str, description, amount in cursor.fetchall(): try: dt = datetime.strptime(str(date_str), "%Y-%m-%d") is_zero = 1 if amount == 0.0 or amount == -0.0 else 0 is_negative_zero = 1 if str(amount).strip() == "-0.0" else 0 rows.append({ "year": dt.year, "month": dt.month, "day": dt.day, "day_of_week": dt.weekday(), "description": description, "amount": amount, "is_zero": is_zero, "is_negative_zero": is_negative_zero, "label": 0 }) except: continue cursor.close() conn.close() print(f"[+] Pulled {len(rows)} good rows.") return rows def main(): print("[*] Starting training data generation...") bad_rows = process_error_logs() if not bad_rows: print("[!] No error logs found. Exiting.") return good_rows = pull_good_rows(len(bad_rows)) combined = bad_rows + good_rows print(f"[*] Writing combined {len(combined)} rows to {OUTPUT_PATH}...") os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True) with open(OUTPUT_PATH, "w") as f: writer = csv.DictWriter(f, fieldnames=[ "year", "month", "day", "day_of_week", "description",

Introduction
COBOL and machine learning may sound like opposites—one was born in the era of punch cards, the other in the age of GPU clusters. But when COBOL jobs fail, they generate highly valuable data. In our eks_cobol
architecture, every failed job writes an .error.json
to S3. These structured error logs become the foundation for supervised learning: training a SageMaker model to predict which jobs will fail in the future.
This article walks through how we preprocess those JSON error files, flatten them into a tabular dataset, and upload a clean CSV to S3 for SageMaker. You’ll get the full code, pipeline architecture, and design decisions—all in one shot.
Problem: Structured Enough to Log, Too Messy for ML
Our error JSONs are structured like this:
{
"jobId": "e8f3d9d4-1c9b-4c7b-b9e2-f2345a3a9c92",
"timestamp": "2025-04-03T19:32:10Z",
"status": "failed",
"errorType": "DataFormatError",
"message": "Invalid date format in field 6",
"inputFile": "customers_202504.csv",
"line": 42,
"rawRecord": "A123,John,Doe,04/35/2024,ACTIVE"
}
This is great for debugging, but ML needs a table—rows and columns. So we extract a subset of fields (jobId, inputFile, errorType, message, rawRecord) and add a label: isFailure = True
.
The Preprocessing Service
We built a service cronjob that scans all .json
files in an error S3 bucket, flattens them, writes them to a CSV, and uploads that CSV to an ML-specific S3 bucket for SageMaker to consume.
import os
import json
import csv
import psycopg2
from datetime import datetime
from glob import glob
LOG_ROOT = "/logs/cobol"
OUTPUT_PATH = "/logs/training/training-data.csv"
PGHOST = os.getenv("POSTGRES_CONNECTION_ENDPOINT")
PGDATABASE = os.getenv("POSTGRES_DATABASE_NAME")
PGUSER = os.getenv("POSTGRES_USER")
PGPASSWORD = os.getenv("POSTGRES_PASSWORD")
def parse_row_string(row):
parts = row.strip().split(",")
if len(parts) != 4:
return None
try:
date_str, category, tx_type, amount_str = parts
description = f"{category} {tx_type}"
dt = datetime.strptime(date_str, "%Y-%m-%d")
amount = float(amount_str)
is_zero = 1 if amount == 0.0 or amount == -0.0 else 0
is_negative_zero = 1 if amount_str.strip() == "-0.0" else 0
return {
"year": dt.year,
"month": dt.month,
"day": dt.day,
"day_of_week": dt.weekday(),
"description": description,
"amount": amount,
"is_zero": is_zero,
"is_negative_zero": is_negative_zero
}
except:
return None
def process_error_logs():
print("[*] Scanning error logs in /logs/cobol...")
rows = []
files = glob(f"{LOG_ROOT}/**/*.json", recursive=True)
print(f"[*] Found {len(files)} error log files.")
for file in files:
try:
with open(file, "r") as f:
data = json.load(f)
row = data.get("row", "")
parsed = parse_row_string(row)
if parsed:
parsed["label"] = 1
rows.append(parsed)
except:
continue
print(f"[+] Parsed {len(rows)} error rows.")
return rows
def pull_good_rows(n):
print(f"[*] Connecting to Postgres and pulling {n} good rows...")
conn = psycopg2.connect(
host=PGHOST,
dbname=PGDATABASE,
user=PGUSER,
password=PGPASSWORD,
connect_timeout=10
)
cursor = conn.cursor()
cursor.execute(f"SELECT transaction_date, description, amount FROM transactions ORDER BY random() LIMIT {n};")
rows = []
for date_str, description, amount in cursor.fetchall():
try:
dt = datetime.strptime(str(date_str), "%Y-%m-%d")
is_zero = 1 if amount == 0.0 or amount == -0.0 else 0
is_negative_zero = 1 if str(amount).strip() == "-0.0" else 0
rows.append({
"year": dt.year,
"month": dt.month,
"day": dt.day,
"day_of_week": dt.weekday(),
"description": description,
"amount": amount,
"is_zero": is_zero,
"is_negative_zero": is_negative_zero,
"label": 0
})
except:
continue
cursor.close()
conn.close()
print(f"[+] Pulled {len(rows)} good rows.")
return rows
def main():
print("[*] Starting training data generation...")
bad_rows = process_error_logs()
if not bad_rows:
print("[!] No error logs found. Exiting.")
return
good_rows = pull_good_rows(len(bad_rows))
combined = bad_rows + good_rows
print(f"[*] Writing combined {len(combined)} rows to {OUTPUT_PATH}...")
os.makedirs(os.path.dirname(OUTPUT_PATH), exist_ok=True)
with open(OUTPUT_PATH, "w") as f:
writer = csv.DictWriter(f, fieldnames=[
"year", "month", "day", "day_of_week", "description",
"amount", "is_zero", "is_negative_zero", "label"
])
writer.writeheader()
for row in combined:
writer.writerow(row)
print("[✓] Training data generation complete.")
if __name__ == "__main__":
main()
The S3 bucket is mounted to the /logs
directory.
Output Format
The resulting CSV is flat, clean, and ready to be merged with non-error data:
jobId,inputFile,errorType,message,rawRecord,isFailure
e8f3d9d4-1c9b,customers_202504.csv,DataFormatError,"Invalid date format",A123,John,Doe,04/35/2024,ACTIVE,True
You can join this with successful job outputs (with isFailure = False) to create a binary classifier dataset.
Usage in SageMaker
We drop this file into s3://my-cobol-ml-dataset/training/errors_flat.csv
, which is picked up in our SageMaker training pipeline. We use this dataset to train a binary classification model using XGBoost to predict error likelihood based on patterns in the input file, job metadata, or record structure.
This model gives us a proactive signal. When a new file is ingested, we run a prediction first—if the model predicts a high chance of failure, we flag the file, route it to a validation lane, or skip processing entirely.
Conclusion
This is how we take raw COBOL failures and weaponize them as machine learning fuel. A Lambda function flattens error logs from S3 into a single CSV file that SageMaker can consume. From here, it’s all optimization—more features, smarter labels, bigger models. But it starts with structured, automated preprocessing.