Spaces:
Sleeping
Sleeping
| import argparse | |
| import logging | |
| import os | |
| import re | |
| import sys | |
| from typing import List, Optional | |
| from pathlib import Path | |
| from logging_utils import setup_logging | |
| from agent import BIDSifierAgent | |
| from prompts import _ctx | |
| def _read_pdf(path: str) -> str: | |
| """Extract text from a PDF file using pypdf.""" | |
| try: | |
| from pypdf import PdfReader | |
| except ImportError as e: | |
| raise RuntimeError( | |
| "Reading PDFs requires the 'pypdf' package. Install it with: pip install pypdf" | |
| ) from e | |
| text_parts: List[str] = [] | |
| with open(path, "rb") as f: | |
| reader = PdfReader(f) | |
| for i, page in enumerate(reader.pages): | |
| try: | |
| text = page.extract_text() or "" | |
| except Exception: | |
| text = "" | |
| if text.strip(): | |
| # Add lightweight page markers to help the LLM | |
| text_parts.append(f"\n\n=== Page {i+1} ===\n{text.strip()}") | |
| return "\n".join(text_parts).strip() | |
| def _read_optional(path: Optional[str]) -> Optional[str]: | |
| if not path: | |
| return None | |
| if not os.path.isfile(path): | |
| raise FileNotFoundError(f"File not found: {path}") | |
| ext = os.path.splitext(path)[1].lower() | |
| if ext == ".pdf": | |
| return _read_pdf(path) | |
| with open(path, "r", encoding="utf-8", errors="ignore") as f: | |
| return f.read() | |
| def parse_commands_from_markdown(markdown: str) -> List[str]: | |
| """Extract the first bash/sh fenced code block and return one command per line.""" | |
| pattern = re.compile(r"```(?:bash|sh)\n(.*?)```", re.DOTALL | re.IGNORECASE) | |
| m = pattern.search(markdown) | |
| if not m: | |
| return [] | |
| block = m.group(1) | |
| commands: List[str] = [] | |
| for raw in block.splitlines(): | |
| line = raw.strip() | |
| if not line or line.startswith("#"): | |
| continue | |
| commands.append(line) | |
| return commands | |
| def _print_commands(commands: List[str]) -> None: | |
| if not commands: | |
| print("(No commands detected in fenced bash block.)") | |
| return | |
| print("-----"*10) | |
| print("COMMANDS TO EXECUTE:") | |
| print("-----"*10) | |
| for c in commands: | |
| print(f" {c}") | |
| def prompt_yes_no(question: str, default: bool = False) -> bool: | |
| suffix = "[Y/n]" if default else "[y/N]" | |
| ans = input(f"{question} {suffix} ").strip().lower() | |
| if not ans: | |
| return default | |
| return ans in {"y", "yes"} | |
| def short_divider(title: str) -> None: | |
| print("\n" + "=" * 80) | |
| print(title) | |
| print("=" * 80 + "\n") | |
| def enter_feedback_loop(agent: BIDSifierAgent, context: dict, last_model_reply: str, logger: Optional[logging.Logger] = None) -> dict: | |
| feedback = input("\nAny comments or corrections to the summary? (press Enter to skip): ").strip() | |
| while feedback: | |
| if logger: | |
| logger.info("User feedback: %s", feedback) | |
| context["user_feedback"] += feedback | |
| ctx = f"\n{_ctx(context['dataset_xml'], context['readme_text'], context['publication_text'])}" | |
| query = f"Tackle the user feedback. \n ### Context:### {ctx} \n ### Your previous message:### {last_model_reply} \n ### User feedback:### {feedback} \n ###Output:###" | |
| agent_response = agent.run_query(query) | |
| print(agent_response) | |
| last_model_reply = agent_response | |
| feedback = input("\nAny additional comments or corrections? (press Enter to skip): ").strip() | |
| return context | |
| def main(argv: Optional[List[str]] = None) -> int: | |
| parser = argparse.ArgumentParser( | |
| prog="bidsifier", | |
| description="Interactive LLM assistant to convert a dataset into BIDS via stepwise shell commands.", | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| ) | |
| parser.add_argument("--dataset-xml", dest="dataset_xml_path", help="Path to dataset structure XML", required=False) | |
| parser.add_argument("--readme", dest="readme_path", help="Path to dataset README file", required=False) | |
| parser.add_argument("--publication", dest="publication_path", help="Path to a publication/notes file", required=False) | |
| parser.add_argument("--output-root", dest="output_root", help="Target BIDS root directory", required=True) | |
| parser.add_argument("--provider", dest="provider", help="Provider name or identifier, default OpeanAI", required=False, default="openai") | |
| parser.add_argument("--model", dest="model", help="Model name to use", default=os.getenv("BIDSIFIER_MODEL", "gpt-4o-mini")) | |
| parser.add_argument("--project", dest="project", help="Project name for log file prefix", required=False) | |
| # Execution is intentionally disabled; we only display commands. | |
| # Keeping --dry-run for backward compatibility (no effect other than display). | |
| parser.add_argument("--dry-run", dest="dry_run", help="Display-only (default behavior)", action="store_true") | |
| args = parser.parse_args(argv) | |
| project_name = args.project or Path(args.output_root).name or Path(os.getcwd()).name | |
| logger, _listener = setup_logging(project_name=project_name) | |
| logger.info("Initialized logging for project '%s'", project_name) | |
| dataset_xml = _read_optional(args.dataset_xml_path) | |
| readme_text = _read_optional(args.readme_path) | |
| publication_text = _read_optional(args.publication_path) | |
| context = { | |
| "dataset_xml": dataset_xml, | |
| "readme_text": readme_text, | |
| "publication_text": publication_text, | |
| "output_root": args.output_root, | |
| "user_feedback": "", | |
| } | |
| command_env = { | |
| "OUTPUT_ROOT": args.output_root, | |
| } | |
| if args.dataset_xml_path: | |
| command_env["DATASET_XML_PATH"] = os.path.abspath(args.dataset_xml_path) | |
| if args.readme_path: | |
| command_env["README_PATH"] = os.path.abspath(args.readme_path) | |
| if args.publication_path: | |
| command_env["PUBLICATION_PATH"] = os.path.abspath(args.publication_path) | |
| agent = BIDSifierAgent(provider=args.provider, model=args.model) | |
| short_divider("Step 1: Understand dataset") | |
| summary = agent.run_step("summary", context) | |
| print(summary) | |
| logger.info(summary) | |
| logger.info("Summary step completed (length=%d chars)", len(summary)) | |
| context = enter_feedback_loop(agent, context, logger) | |
| if not prompt_yes_no("Proceed to create BIDS root?", default=True): | |
| logger.info("User aborted after summary step.") | |
| return 0 | |
| short_divider("Step 2: Propose commands to create metadata files") | |
| meta_plan = agent.run_step("create_metadata", context) | |
| print(meta_plan) | |
| cmds = parse_commands_from_markdown(meta_plan) | |
| _print_commands(cmds) | |
| logger.info("Metadata plan produced %s", cmds) | |
| logger.info("Metadata plan produced %d commands", len(cmds)) | |
| context = enter_feedback_loop(agent, context, logger) | |
| if not prompt_yes_no("Proceed to create empty BIDS structure?", default=True): | |
| logger.info("User aborted after metadata plan.") | |
| return 0 | |
| short_divider("Step 3: Propose commands to create dataset structure") | |
| struct_plan = agent.run_step("create_structure", context) | |
| print(struct_plan) | |
| cmds = parse_commands_from_markdown(struct_plan) | |
| _print_commands(cmds) | |
| logger.info("Structure plan produced %s", cmds) | |
| logger.info("Structure plan produced %d commands", len(cmds)) | |
| context = enter_feedback_loop(agent, context, logger) | |
| if not prompt_yes_no("Proceed to propose renaming/moving?", default=True): | |
| logger.info("User aborted after structure plan.") | |
| return 0 | |
| short_divider("Step 4: Propose commands to rename/move files") | |
| move_plan = agent.run_step("rename_move", context) | |
| print(move_plan) | |
| cmds = parse_commands_from_markdown(move_plan) | |
| _print_commands(cmds) | |
| logger.info("Rename/move plan produced %s", cmds) | |
| logger.info("Rename/move plan produced %d commands", len(cmds)) | |
| context = enter_feedback_loop(agent, context, logger) | |
| print("\nAll steps completed. Commands were only displayed - use them manually") | |
| logger.info("All steps completed successfully.") | |
| return 0 | |
| if __name__ == "__main__": | |
| sys.exit(main()) | |