tag_follow_packs.py (13093B)
1 #!/usr/bin/env python3 2 """ 3 Nostr Event Updater 4 5 This script fetches Nostr events based on a YAML mapping file, updates them with 6 'tags' based on the mapping data, and signs them with a specified private key. 7 Optionally can publish the updated events to a relay. 8 9 Example YAML mapping file format: 10 ``` 11 # mapping.yaml 12 "39089:17538dc2a62769d09443f18c37cbe358fab5bbf981173542aa7c5ff171ed77c4:cioc58duuftq": ["farmers", "agriculture"] 13 "1:abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789:someid": "technology" 14 ``` 15 16 Each key is in the format "kind:pubkey:d-value" and the value is either a single tag string 17 or a list of tag strings to add. 18 """ 19 import sys 20 import json 21 import argparse 22 import yaml 23 import subprocess 24 import time 25 import os 26 from typing import Dict, List, Optional, Tuple, Any, Union 27 28 29 def parse_args(): 30 parser = argparse.ArgumentParser( 31 description="Fetch Nostr events, update them with tags 't' based on a mapping, and sign them with a private key.", 32 epilog=""" 33 Examples: 34 # Fetch events, update tags, and print to stdout 35 ./update_jsonl.py mapping.yaml nsec1... 36 37 # Fetch events, update tags, and publish to a relay 38 ./update_jsonl.py mapping.yaml nsec1... --publish --relay wss://relay.example.com 39 40 # Fetch events, update tags, save to file, and update timestamps 41 ./update_jsonl.py mapping.yaml nsec1... --output updated_events.jsonl --update-timestamp 42 """ 43 ) 44 parser.add_argument( 45 "map_yaml_file", 46 help="Path to the YAML file containing the mapping in format 'kind:pubkey:d-value': [tags]" 47 ) 48 parser.add_argument( 49 "private_key", 50 help="Private key (hex or nsec format) for signing the updated events." 51 ) 52 parser.add_argument( 53 "--relay", 54 default="wss://relay.damus.io", 55 help="Relay URL to fetch events from and optionally publish to. (default: wss://relay.damus.io)" 56 ) 57 parser.add_argument( 58 "--output", 59 default=None, 60 help="Output file path to save updated events. If not provided, print to stdout." 61 ) 62 parser.add_argument( 63 "--publish", 64 action="store_true", 65 help="Publish updated events to the specified relay." 66 ) 67 parser.add_argument( 68 "--update-timestamp", 69 action="store_true", 70 help="Update event timestamps to current time instead of preserving original timestamps." 71 ) 72 return parser.parse_args() 73 74 75 def split_coordinate(coordinate: str) -> Tuple[int, str, str]: 76 """Split a coordinate string into kind, pubkey, and d-tag value.""" 77 parts = coordinate.split(":") 78 if len(parts) != 3: 79 raise ValueError(f"Invalid coordinate format: {coordinate}") 80 kind = int(parts[0]) 81 pubkey = parts[1] 82 d_value = parts[2] 83 return kind, pubkey, d_value 84 85 86 def fetch_event(kind: int, pubkey: str, d_value: str, relay: str) -> Optional[Dict]: 87 """Fetch an event from the Nostr network using nak CLI. 88 89 Args: 90 kind: The event kind to fetch 91 pubkey: The author's public key 92 d_value: The d-tag value to match 93 relay: The relay URL to fetch from 94 95 Returns: 96 The event as a dictionary, or None if not found or error 97 """ 98 try: 99 # Check if nak CLI is available 100 try: 101 subprocess.run(["nak", "--version"], capture_output=True, check=True) 102 except (subprocess.CalledProcessError, FileNotFoundError): 103 sys.stderr.write("Error: 'nak' CLI tool is not available or not in PATH.\n") 104 sys.stderr.write("Please install it from https://github.com/fiatjaf/nak\n") 105 sys.exit(1) 106 107 # Prepare the request command 108 cmd = [ 109 "nak", "req", 110 "--kind", str(kind), 111 "--author", pubkey, 112 "-d", d_value, 113 relay 114 ] 115 116 sys.stderr.write(f"Fetching event: kind={kind}, author={pubkey}, d={d_value} from {relay}...\n") 117 result = subprocess.run(cmd, capture_output=True, text=True, check=True) 118 119 if not result.stdout.strip(): 120 sys.stderr.write(f"No event found for kind={kind}, pubkey={pubkey}, d={d_value}\n") 121 return None 122 123 event_data = json.loads(result.stdout.strip()) 124 sys.stderr.write(f"Successfully fetched event with ID: {event_data.get('id', 'unknown')}\n") 125 return event_data 126 127 except subprocess.CalledProcessError as e: 128 sys.stderr.write(f"Error fetching event: {e}\n") 129 sys.stderr.write(f"stderr: {e.stderr}\n") 130 return None 131 except json.JSONDecodeError as e: 132 sys.stderr.write(f"Invalid JSON response: {e}\n") 133 sys.stderr.write(f"Response: {result.stdout}\n") 134 return None 135 except Exception as e: 136 sys.stderr.write(f"Unexpected error fetching event: {e}\n") 137 return None 138 139 140 def get_d_tag(tags: List[List[str]]) -> Optional[str]: 141 """Find the d-tag value in the event tags.""" 142 for tag in tags: 143 if tag and len(tag) > 1 and tag[0] == "d": 144 return tag[1] 145 return None 146 147 148 def update_event_tags(event: Dict, tag_values: List[str]) -> Dict: 149 """Update the event tags with new t-tags.""" 150 if "tags" not in event: 151 event["tags"] = [] 152 153 # Remove existing t-tags to avoid duplicates 154 event["tags"] = [tag for tag in event["tags"] if not (tag and tag[0] == "t")] 155 156 # Add new t-tags 157 for val in tag_values: 158 event["tags"].append(["t", val]) 159 160 return event 161 162 163 def sign_and_publish_event(event: Dict, private_key: str, relay: str = None) -> Dict: 164 """Sign the event with the provided private key using nak and optionally publish it. 165 166 Args: 167 event: The event to sign 168 private_key: The private key (hex or nsec format) for signing 169 relay: Optional relay URL to publish to 170 171 Returns: 172 The signed event as a dictionary 173 174 Raises: 175 SystemExit: If signing or publishing fails 176 """ 177 # Preserve the original event's structure, but remove fields that will be regenerated 178 # (id, sig, pubkey) as they'll be replaced by the signing process 179 signing_event = { 180 "kind": event["kind"], 181 "created_at": event["created_at"], # Preserve original timestamp 182 "content": event["content"], 183 "tags": event["tags"], 184 } 185 186 try: 187 # Set up nak event command with private key 188 cmd = ["nak", "event", "--sec", private_key] 189 190 # Add relay if publishing is requested 191 if relay: 192 cmd.append(relay) 193 194 event_json = json.dumps(signing_event) 195 196 sys.stderr.write(f"Signing event of kind {event['kind']}...\n") 197 result = subprocess.run( 198 cmd, 199 input=event_json, 200 capture_output=True, 201 text=True, 202 check=True 203 ) 204 205 signed_event = json.loads(result.stdout.strip()) 206 207 if relay: 208 sys.stderr.write(f"Published event to {relay}: {signed_event['id']}\n") 209 else: 210 sys.stderr.write(f"Event signed with ID: {signed_event['id']}\n") 211 212 return signed_event 213 except subprocess.CalledProcessError as e: 214 sys.stderr.write(f"Error signing/publishing event: {e}\n") 215 sys.stderr.write(f"stderr: {e.stderr}\n") 216 sys.exit(1) 217 except json.JSONDecodeError as e: 218 sys.stderr.write(f"Invalid JSON in signed event: {e}\n") 219 sys.stderr.write(f"Response: {result.stdout}\n") 220 sys.exit(1) 221 except Exception as e: 222 sys.stderr.write(f"Unexpected error during signing/publishing: {e}\n") 223 sys.exit(1) 224 225 226 def validate_private_key(private_key: str) -> bool: 227 """Validate that the provided private key is in a valid format. 228 229 Args: 230 private_key: The private key string to validate 231 232 Returns: 233 True if the key format appears valid, False otherwise 234 """ 235 # Check for nsec format 236 if private_key.startswith("nsec1"): 237 return len(private_key) >= 60 # Approx length for nsec keys 238 239 # Check for hex format 240 if all(c in "0123456789abcdefABCDEF" for c in private_key): 241 return len(private_key) == 64 242 243 return False 244 245 246 def main(): 247 args = parse_args() 248 249 # Validate the private key format 250 if not validate_private_key(args.private_key): 251 sys.stderr.write("Error: Invalid private key format. Must be hex (64 chars) or nsec1 format.\n") 252 sys.exit(1) 253 254 # Check if the mapping file exists 255 if not os.path.isfile(args.map_yaml_file): 256 sys.stderr.write(f"Error: Mapping file '{args.map_yaml_file}' does not exist or is not accessible.\n") 257 sys.exit(1) 258 259 # Load the mapping from the provided YAML file 260 try: 261 with open(args.map_yaml_file, "r") as mf: 262 mapping = yaml.safe_load(mf) 263 if mapping is None: 264 sys.stderr.write(f"Error: Mapping file '{args.map_yaml_file}' is empty or invalid.\n") 265 sys.exit(1) 266 except yaml.YAMLError as e: 267 sys.stderr.write(f"Error parsing YAML file: {e}\n") 268 sys.exit(1) 269 except Exception as e: 270 sys.stderr.write(f"Error loading mapping file: {e}\n") 271 sys.exit(1) 272 273 # If the mapping is a list, convert it to a dictionary 274 if isinstance(mapping, list): 275 new_mapping = {} 276 for item in mapping: 277 if isinstance(item, dict): 278 new_mapping.update(item) 279 else: 280 sys.stderr.write(f"Unexpected item in mapping list: {item}\n") 281 mapping = new_mapping 282 283 # Make sure we have at least one mapping 284 if not mapping: 285 sys.stderr.write("Error: No valid mappings found in the YAML file.\n") 286 sys.exit(1) 287 288 # Prepare output file if specified 289 output_file = None 290 if args.output: 291 try: 292 output_file = open(args.output, "w") 293 sys.stderr.write(f"Writing output to '{args.output}'\n") 294 except Exception as e: 295 sys.stderr.write(f"Error opening output file: {e}\n") 296 sys.exit(1) 297 298 updated_events = [] 299 total_events = len(mapping) 300 301 sys.stderr.write(f"Processing {total_events} events from mapping...\n") 302 303 # Process each coordinate in the mapping 304 for i, (coordinate, tag_values) in enumerate(mapping.items(), 1): 305 try: 306 sys.stderr.write(f"[{i}/{total_events}] Processing coordinate: {coordinate}\n") 307 kind, pubkey, d_value = split_coordinate(coordinate) 308 309 # Fetch the event 310 event = fetch_event(kind, pubkey, d_value, args.relay) 311 if not event: 312 sys.stderr.write(f"Skipping coordinate {coordinate}: Event not found\n") 313 continue 314 315 # Verify the event has the expected d-tag 316 event_d_tag = get_d_tag(event.get("tags", [])) 317 if event_d_tag != d_value: 318 sys.stderr.write(f"Skipping coordinate {coordinate}: D-tag mismatch (expected={d_value}, found={event_d_tag})\n") 319 continue 320 321 # Update the event tags 322 if isinstance(tag_values, list): 323 updated_event = update_event_tags(event, tag_values) 324 sys.stderr.write(f"Added {len(tag_values)} t-tags: {', '.join(tag_values)}\n") 325 elif tag_values is not None: 326 updated_event = update_event_tags(event, [tag_values]) 327 sys.stderr.write(f"Added t-tag: {tag_values}\n") 328 else: 329 sys.stderr.write(f"Skipping coordinate {coordinate}: No tag values\n") 330 continue 331 332 # Update timestamp if requested 333 if args.update_timestamp: 334 updated_event["created_at"] = int(time.time()) 335 sys.stderr.write(f"Updated timestamp to current time: {updated_event['created_at']}\n") 336 337 # Sign the updated event and optionally publish it 338 signed_event = sign_and_publish_event( 339 updated_event, 340 args.private_key, 341 args.relay if args.publish else None 342 ) 343 344 # Save or print the updated event 345 updated_events.append(signed_event) 346 if output_file: 347 output_file.write(json.dumps(signed_event) + "\n") 348 else: 349 print(json.dumps(signed_event)) 350 351 except ValueError as e: 352 sys.stderr.write(f"Error processing coordinate {coordinate}: {e}\n") 353 continue 354 except Exception as e: 355 sys.stderr.write(f"Unexpected error processing coordinate {coordinate}: {e}\n") 356 continue 357 358 # Close output file if opened 359 if output_file: 360 output_file.close() 361 362 successful = len(updated_events) 363 failed = total_events - successful 364 sys.stderr.write(f"Summary: Successfully processed {successful} events, {failed} failed\n") 365 366 367 if __name__ == "__main__": 368 main()