damus

nostr ios client
git clone git://jb55.com/damus
Log | Files | Refs | README | LICENSE

tag_follow_packs.py (13093B)


      1 #!/usr/bin/env python3
      2 """
      3 Nostr Event Updater
      4 
      5 This script fetches Nostr events based on a YAML mapping file, updates them with
      6 'tags' based on the mapping data, and signs them with a specified private key.
      7 Optionally can publish the updated events to a relay.
      8 
      9 Example YAML mapping file format:
     10 ```
     11 # mapping.yaml
     12 "39089:17538dc2a62769d09443f18c37cbe358fab5bbf981173542aa7c5ff171ed77c4:cioc58duuftq": ["farmers", "agriculture"]
     13 "1:abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789:someid": "technology"
     14 ```
     15 
     16 Each key is in the format "kind:pubkey:d-value" and the value is either a single tag string
     17 or a list of tag strings to add.
     18 """
     19 import sys
     20 import json
     21 import argparse
     22 import yaml
     23 import subprocess
     24 import time
     25 import os
     26 from typing import Dict, List, Optional, Tuple, Any, Union
     27 
     28 
     29 def parse_args():
     30     parser = argparse.ArgumentParser(
     31         description="Fetch Nostr events, update them with tags 't' based on a mapping, and sign them with a private key.",
     32         epilog="""
     33 Examples:
     34   # Fetch events, update tags, and print to stdout
     35   ./update_jsonl.py mapping.yaml nsec1...
     36 
     37   # Fetch events, update tags, and publish to a relay
     38   ./update_jsonl.py mapping.yaml nsec1... --publish --relay wss://relay.example.com
     39 
     40   # Fetch events, update tags, save to file, and update timestamps
     41   ./update_jsonl.py mapping.yaml nsec1... --output updated_events.jsonl --update-timestamp
     42 """
     43     )
     44     parser.add_argument(
     45         "map_yaml_file", 
     46         help="Path to the YAML file containing the mapping in format 'kind:pubkey:d-value': [tags]"
     47     )
     48     parser.add_argument(
     49         "private_key", 
     50         help="Private key (hex or nsec format) for signing the updated events."
     51     )
     52     parser.add_argument(
     53         "--relay", 
     54         default="wss://relay.damus.io", 
     55         help="Relay URL to fetch events from and optionally publish to. (default: wss://relay.damus.io)"
     56     )
     57     parser.add_argument(
     58         "--output", 
     59         default=None, 
     60         help="Output file path to save updated events. If not provided, print to stdout."
     61     )
     62     parser.add_argument(
     63         "--publish", 
     64         action="store_true", 
     65         help="Publish updated events to the specified relay."
     66     )
     67     parser.add_argument(
     68         "--update-timestamp", 
     69         action="store_true", 
     70         help="Update event timestamps to current time instead of preserving original timestamps."
     71     )
     72     return parser.parse_args()
     73 
     74 
     75 def split_coordinate(coordinate: str) -> Tuple[int, str, str]:
     76     """Split a coordinate string into kind, pubkey, and d-tag value."""
     77     parts = coordinate.split(":")
     78     if len(parts) != 3:
     79         raise ValueError(f"Invalid coordinate format: {coordinate}")
     80     kind = int(parts[0])
     81     pubkey = parts[1]
     82     d_value = parts[2]
     83     return kind, pubkey, d_value
     84 
     85 
     86 def fetch_event(kind: int, pubkey: str, d_value: str, relay: str) -> Optional[Dict]:
     87     """Fetch an event from the Nostr network using nak CLI.
     88     
     89     Args:
     90         kind: The event kind to fetch
     91         pubkey: The author's public key
     92         d_value: The d-tag value to match
     93         relay: The relay URL to fetch from
     94         
     95     Returns:
     96         The event as a dictionary, or None if not found or error
     97     """
     98     try:
     99         # Check if nak CLI is available
    100         try:
    101             subprocess.run(["nak", "--version"], capture_output=True, check=True)
    102         except (subprocess.CalledProcessError, FileNotFoundError):
    103             sys.stderr.write("Error: 'nak' CLI tool is not available or not in PATH.\n")
    104             sys.stderr.write("Please install it from https://github.com/fiatjaf/nak\n")
    105             sys.exit(1)
    106             
    107         # Prepare the request command
    108         cmd = [
    109             "nak", "req", 
    110             "--kind", str(kind), 
    111             "--author", pubkey, 
    112             "-d", d_value,
    113             relay
    114         ]
    115         
    116         sys.stderr.write(f"Fetching event: kind={kind}, author={pubkey}, d={d_value} from {relay}...\n")
    117         result = subprocess.run(cmd, capture_output=True, text=True, check=True)
    118         
    119         if not result.stdout.strip():
    120             sys.stderr.write(f"No event found for kind={kind}, pubkey={pubkey}, d={d_value}\n")
    121             return None
    122         
    123         event_data = json.loads(result.stdout.strip())
    124         sys.stderr.write(f"Successfully fetched event with ID: {event_data.get('id', 'unknown')}\n")
    125         return event_data
    126         
    127     except subprocess.CalledProcessError as e:
    128         sys.stderr.write(f"Error fetching event: {e}\n")
    129         sys.stderr.write(f"stderr: {e.stderr}\n")
    130         return None
    131     except json.JSONDecodeError as e:
    132         sys.stderr.write(f"Invalid JSON response: {e}\n")
    133         sys.stderr.write(f"Response: {result.stdout}\n")
    134         return None
    135     except Exception as e:
    136         sys.stderr.write(f"Unexpected error fetching event: {e}\n")
    137         return None
    138 
    139 
    140 def get_d_tag(tags: List[List[str]]) -> Optional[str]:
    141     """Find the d-tag value in the event tags."""
    142     for tag in tags:
    143         if tag and len(tag) > 1 and tag[0] == "d":
    144             return tag[1]
    145     return None
    146 
    147 
    148 def update_event_tags(event: Dict, tag_values: List[str]) -> Dict:
    149     """Update the event tags with new t-tags."""
    150     if "tags" not in event:
    151         event["tags"] = []
    152     
    153     # Remove existing t-tags to avoid duplicates
    154     event["tags"] = [tag for tag in event["tags"] if not (tag and tag[0] == "t")]
    155     
    156     # Add new t-tags
    157     for val in tag_values:
    158         event["tags"].append(["t", val])
    159     
    160     return event
    161 
    162 
    163 def sign_and_publish_event(event: Dict, private_key: str, relay: str = None) -> Dict:
    164     """Sign the event with the provided private key using nak and optionally publish it.
    165     
    166     Args:
    167         event: The event to sign
    168         private_key: The private key (hex or nsec format) for signing
    169         relay: Optional relay URL to publish to
    170         
    171     Returns:
    172         The signed event as a dictionary
    173     
    174     Raises:
    175         SystemExit: If signing or publishing fails
    176     """
    177     # Preserve the original event's structure, but remove fields that will be regenerated
    178     # (id, sig, pubkey) as they'll be replaced by the signing process
    179     signing_event = {
    180         "kind": event["kind"],
    181         "created_at": event["created_at"],  # Preserve original timestamp
    182         "content": event["content"],
    183         "tags": event["tags"],
    184     }
    185     
    186     try:
    187         # Set up nak event command with private key
    188         cmd = ["nak", "event", "--sec", private_key]
    189         
    190         # Add relay if publishing is requested
    191         if relay:
    192             cmd.append(relay)
    193             
    194         event_json = json.dumps(signing_event)
    195         
    196         sys.stderr.write(f"Signing event of kind {event['kind']}...\n")
    197         result = subprocess.run(
    198             cmd, 
    199             input=event_json, 
    200             capture_output=True, 
    201             text=True, 
    202             check=True
    203         )
    204         
    205         signed_event = json.loads(result.stdout.strip())
    206         
    207         if relay:
    208             sys.stderr.write(f"Published event to {relay}: {signed_event['id']}\n")
    209         else:
    210             sys.stderr.write(f"Event signed with ID: {signed_event['id']}\n")
    211             
    212         return signed_event
    213     except subprocess.CalledProcessError as e:
    214         sys.stderr.write(f"Error signing/publishing event: {e}\n")
    215         sys.stderr.write(f"stderr: {e.stderr}\n")
    216         sys.exit(1)
    217     except json.JSONDecodeError as e:
    218         sys.stderr.write(f"Invalid JSON in signed event: {e}\n")
    219         sys.stderr.write(f"Response: {result.stdout}\n")
    220         sys.exit(1)
    221     except Exception as e:
    222         sys.stderr.write(f"Unexpected error during signing/publishing: {e}\n")
    223         sys.exit(1)
    224 
    225 
    226 def validate_private_key(private_key: str) -> bool:
    227     """Validate that the provided private key is in a valid format.
    228     
    229     Args:
    230         private_key: The private key string to validate
    231         
    232     Returns:
    233         True if the key format appears valid, False otherwise
    234     """
    235     # Check for nsec format
    236     if private_key.startswith("nsec1"):
    237         return len(private_key) >= 60  # Approx length for nsec keys
    238         
    239     # Check for hex format
    240     if all(c in "0123456789abcdefABCDEF" for c in private_key):
    241         return len(private_key) == 64
    242         
    243     return False
    244 
    245 
    246 def main():
    247     args = parse_args()
    248 
    249     # Validate the private key format
    250     if not validate_private_key(args.private_key):
    251         sys.stderr.write("Error: Invalid private key format. Must be hex (64 chars) or nsec1 format.\n")
    252         sys.exit(1)
    253 
    254     # Check if the mapping file exists
    255     if not os.path.isfile(args.map_yaml_file):
    256         sys.stderr.write(f"Error: Mapping file '{args.map_yaml_file}' does not exist or is not accessible.\n")
    257         sys.exit(1)
    258 
    259     # Load the mapping from the provided YAML file
    260     try:
    261         with open(args.map_yaml_file, "r") as mf:
    262             mapping = yaml.safe_load(mf)
    263             if mapping is None:
    264                 sys.stderr.write(f"Error: Mapping file '{args.map_yaml_file}' is empty or invalid.\n")
    265                 sys.exit(1)
    266     except yaml.YAMLError as e:
    267         sys.stderr.write(f"Error parsing YAML file: {e}\n")
    268         sys.exit(1)
    269     except Exception as e:
    270         sys.stderr.write(f"Error loading mapping file: {e}\n")
    271         sys.exit(1)
    272 
    273     # If the mapping is a list, convert it to a dictionary
    274     if isinstance(mapping, list):
    275         new_mapping = {}
    276         for item in mapping:
    277             if isinstance(item, dict):
    278                 new_mapping.update(item)
    279             else:
    280                 sys.stderr.write(f"Unexpected item in mapping list: {item}\n")
    281         mapping = new_mapping
    282 
    283     # Make sure we have at least one mapping
    284     if not mapping:
    285         sys.stderr.write("Error: No valid mappings found in the YAML file.\n")
    286         sys.exit(1)
    287 
    288     # Prepare output file if specified
    289     output_file = None
    290     if args.output:
    291         try:
    292             output_file = open(args.output, "w")
    293             sys.stderr.write(f"Writing output to '{args.output}'\n")
    294         except Exception as e:
    295             sys.stderr.write(f"Error opening output file: {e}\n")
    296             sys.exit(1)
    297 
    298     updated_events = []
    299     total_events = len(mapping)
    300     
    301     sys.stderr.write(f"Processing {total_events} events from mapping...\n")
    302 
    303     # Process each coordinate in the mapping
    304     for i, (coordinate, tag_values) in enumerate(mapping.items(), 1):
    305         try:
    306             sys.stderr.write(f"[{i}/{total_events}] Processing coordinate: {coordinate}\n")
    307             kind, pubkey, d_value = split_coordinate(coordinate)
    308             
    309             # Fetch the event
    310             event = fetch_event(kind, pubkey, d_value, args.relay)
    311             if not event:
    312                 sys.stderr.write(f"Skipping coordinate {coordinate}: Event not found\n")
    313                 continue
    314             
    315             # Verify the event has the expected d-tag
    316             event_d_tag = get_d_tag(event.get("tags", []))
    317             if event_d_tag != d_value:
    318                 sys.stderr.write(f"Skipping coordinate {coordinate}: D-tag mismatch (expected={d_value}, found={event_d_tag})\n")
    319                 continue
    320             
    321             # Update the event tags
    322             if isinstance(tag_values, list):
    323                 updated_event = update_event_tags(event, tag_values)
    324                 sys.stderr.write(f"Added {len(tag_values)} t-tags: {', '.join(tag_values)}\n")
    325             elif tag_values is not None:
    326                 updated_event = update_event_tags(event, [tag_values])
    327                 sys.stderr.write(f"Added t-tag: {tag_values}\n")
    328             else:
    329                 sys.stderr.write(f"Skipping coordinate {coordinate}: No tag values\n")
    330                 continue
    331             
    332             # Update timestamp if requested
    333             if args.update_timestamp:
    334                 updated_event["created_at"] = int(time.time())
    335                 sys.stderr.write(f"Updated timestamp to current time: {updated_event['created_at']}\n")
    336                 
    337             # Sign the updated event and optionally publish it
    338             signed_event = sign_and_publish_event(
    339                 updated_event, 
    340                 args.private_key,
    341                 args.relay if args.publish else None
    342             )
    343             
    344             # Save or print the updated event
    345             updated_events.append(signed_event)
    346             if output_file:
    347                 output_file.write(json.dumps(signed_event) + "\n")
    348             else:
    349                 print(json.dumps(signed_event))
    350                 
    351         except ValueError as e:
    352             sys.stderr.write(f"Error processing coordinate {coordinate}: {e}\n")
    353             continue
    354         except Exception as e:
    355             sys.stderr.write(f"Unexpected error processing coordinate {coordinate}: {e}\n")
    356             continue
    357 
    358     # Close output file if opened
    359     if output_file:
    360         output_file.close()
    361 
    362     successful = len(updated_events)
    363     failed = total_events - successful
    364     sys.stderr.write(f"Summary: Successfully processed {successful} events, {failed} failed\n")
    365 
    366 
    367 if __name__ == "__main__":
    368     main()