Content is user-generated and unverified.
import os import logging from typing import Optional, Dict, Any, List from atproto_client import Client, Session, SessionEvent, models # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) logger = logging.getLogger("bluesky_session_handler") # Load the environment variables import dotenv dotenv.load_dotenv(override=True) import yaml import json import re # Strip fields. A list of fields to remove from a JSON object # Note: 'facets' removed from this list so we can process links STRIP_FIELDS = [ "cid", "rev", "did", "uri", "langs", "threadgate", "py_type", "labels", "avatar", "viewer", "indexed_at", "tags", "associated", "thread_context", "aspect_ratio", "thumb", "fullsize", "root", "created_at", "verification", "like_count", "quote_count", "reply_count", "repost_count", "embedding_disabled", "thread_muted", "reply_disabled", "pinned", "like", "repost", "blocked_by", "blocking", "blocking_by_list", "followed_by", "following", "known_followers", "muted", "muted_by_list", "root_author_like", "entities", "ref", "mime_type", "size", ] def extract_links_from_facets(text: str, facets: List[Dict]) -> List[Dict[str, Any]]: """ Extract link information from Bluesky facets. Args: text: The post text facets: List of facets from the post Returns: List of dictionaries containing link information """ links = [] text_bytes = text.encode('utf-8') for facet in facets: if not isinstance(facet, dict): continue features = facet.get('features', []) index = facet.get('index', {}) for feature in features: if not isinstance(feature, dict): continue # Check if this is a link facet if feature.get('$type') == 'app.bsky.richtext.facet#link': try: start = index.get('byteStart', 0) end = index.get('byteEnd', 0) # Extract the display text display_text = text_bytes[start:end].decode('utf-8') actual_url = feature.get('uri', '') links.append({ 'display_text': display_text, 'actual_url': actual_url, 'byte_start': start, 'byte_end': end }) except (UnicodeDecodeError, KeyError) as e: logger.warning(f"Failed to extract link from facet: {e}") continue return links def extract_mentions_from_facets(text: str, facets: List[Dict]) -> List[Dict[str, Any]]: """ Extract mention information from Bluesky facets. Args: text: The post text facets: List of facets from the post Returns: List of dictionaries containing mention information """ mentions = [] text_bytes = text.encode('utf-8') for facet in facets: if not isinstance(facet, dict): continue features = facet.get('features', []) index = facet.get('index', {}) for feature in features: if not isinstance(feature, dict): continue # Check if this is a mention facet if feature.get('$type') == 'app.bsky.richtext.facet#mention': try: start = index.get('byteStart', 0) end = index.get('byteEnd', 0) # Extract the display text (should include @) display_text = text_bytes[start:end].decode('utf-8') did = feature.get('did', '') mentions.append({ 'display_text': display_text, 'did': did, 'byte_start': start, 'byte_end': end }) except (UnicodeDecodeError, KeyError) as e: logger.warning(f"Failed to extract mention from facet: {e}") continue return mentions def process_post_facets(post_data: Dict) -> Dict[str, Any]: """ Process facets in a post and extract link/mention information. Args: post_data: Post data dictionary Returns: Dictionary with extracted facet information """ text = post_data.get('text', '') facets = post_data.get('facets', []) if not facets: return {} links = extract_links_from_facets(text, facets) mentions = extract_mentions_from_facets(text, facets) facet_info = {} if links: facet_info['extracted_links'] = links if mentions: facet_info['extracted_mentions'] = mentions return facet_info def convert_to_basic_types(obj): """Convert complex Python objects to basic types for JSON/YAML serialization.""" if hasattr(obj, '__dict__'): # Convert objects with __dict__ to their dictionary representation return convert_to_basic_types(obj.__dict__) elif isinstance(obj, dict): return {key: convert_to_basic_types(value) for key, value in obj.items()} elif isinstance(obj, list): return [convert_to_basic_types(item) for item in obj] elif isinstance(obj, (str, int, float, bool)) or obj is None: return obj else: # For other types, try to convert to string return str(obj) def strip_fields(obj, strip_field_list): """Recursively strip fields from a JSON object.""" if isinstance(obj, dict): keys_flagged_for_removal = [] # Remove fields from strip list and pydantic metadata for field in list(obj.keys()): if field in strip_field_list or field.startswith("__"): keys_flagged_for_removal.append(field) # Remove flagged keys for key in keys_flagged_for_removal: obj.pop(key, None) # Recursively process remaining values for key, value in list(obj.items()): obj[key] = strip_fields(value, strip_field_list) # Remove empty/null values after processing if ( obj[key] is None or (isinstance(obj[key], dict) and len(obj[key]) == 0) or (isinstance(obj[key], list) and len(obj[key]) == 0) or (isinstance(obj[key], str) and obj[key].strip() == "") ): obj.pop(key, None) elif isinstance(obj, list): for i, value in enumerate(obj): obj[i] = strip_fields(value, strip_field_list) # Remove None values from list obj[:] = [item for item in obj if item is not None] return obj def enhance_post_with_facets(post_data: Dict) -> Dict: """ Enhance post data by extracting facet information and adding it as readable fields. Args: post_data: The post data dictionary Returns: Enhanced post data with facet information """ if not isinstance(post_data, dict): return post_data # Extract facet information before stripping facet_info = process_post_facets(post_data) # Add the extracted facet information to the post if facet_info: post_data.update(facet_info) # Now remove the original facets field since we've extracted the useful info if 'facets' in post_data: del post_data['facets'] return post_data def thread_to_yaml_string(thread, strip_metadata=True): """ Convert thread data to a YAML-formatted string for LLM parsing. Enhanced to preserve link information from facets. Args: thread: The thread data from get_post_thread strip_metadata: Whether to strip metadata fields for cleaner output Returns: YAML-formatted string representation of the thread """ # First convert complex objects to basic types basic_thread = convert_to_basic_types(thread) # Process facets before stripping metadata def process_item(item): if isinstance(item, dict): # Check if this looks like a post with facets if 'text' in item and 'facets' in item: item = enhance_post_with_facets(item) # Recursively process nested items for key, value in item.items(): item[key] = process_item(value) elif isinstance(item, list): item = [process_item(sub_item) for sub_item in item] return item enhanced_thread = process_item(basic_thread) if strip_metadata: # Create a copy and strip unwanted fields cleaned_thread = strip_fields(enhanced_thread, STRIP_FIELDS) else: cleaned_thread = enhanced_thread return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False) def get_session(username: str) -> Optional[str]: try: with open(f"session_{username}.txt", encoding="UTF-8") as f: return f.read() except FileNotFoundError: logger.debug(f"No existing session found for {username}") return None def save_session(username: str, session_string: str) -> None: with open(f"session_{username}.txt", "w", encoding="UTF-8") as f: f.write(session_string) logger.debug(f"Session saved for {username}") def on_session_change(username: str, event: SessionEvent, session: Session) -> None: logger.info(f"Session changed: {event} {repr(session)}") if event in (SessionEvent.CREATE, SessionEvent.REFRESH): logger.info(f"Saving changed session for {username}") save_session(username, session.export()) def init_client(username: str, password: str) -> Client: pds_uri = os.getenv("PDS_URI") if pds_uri is None: logger.warning( "No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable." ) pds_uri = "https://bsky.social" # Print the PDS URI logger.info(f"Using PDS URI: {pds_uri}") client = Client(pds_uri) client.on_session_change( lambda event, session: on_session_change(username, event, session) ) session_string = get_session(username) if session_string: logger.info(f"Reusing existing session for {username}") client.login(session_string=session_string) else: logger.info(f"Creating new session for {username}") client.login(username, password) return client def default_login() -> Client: username = os.getenv("BSKY_USERNAME") password = os.getenv("BSKY_PASSWORD") if username is None: logger.error( "No username provided. Please provide a username using the BSKY_USERNAME environment variable." ) exit() if password is None: logger.error( "No password provided. Please provide a password using the BSKY_PASSWORD environment variable." ) exit() return init_client(username, password) def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None) -> Dict[str, Any]: """ Reply to a post on Bluesky with rich text support. Args: client: Authenticated Bluesky client text: The reply text reply_to_uri: The URI of the post being replied to (parent) reply_to_cid: The CID of the post being replied to (parent) root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid Returns: The response from sending the post """ import re # If root is not provided, this is a reply to the root post if root_uri is None: root_uri = reply_to_uri root_cid = reply_to_cid # Create references for the reply parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid)) root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid)) # Parse rich text facets (mentions and URLs) facets = [] text_bytes = text.encode("UTF-8") # Parse mentions mention_regex = rb"[$|\W](@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)" for m in re.finditer(mention_regex, text_bytes): handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix try: # Resolve handle to DID using the API resolve_resp = client.app.bsky.actor.get_profile({'actor': handle}) if resolve_resp and hasattr(resolve_resp, 'did'): facets.append( models.AppBskyRichtextFacet.Main( index=models.AppBskyRichtextFacet.ByteSlice( byteStart=m.start(1), byteEnd=m.end(1) ), features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)] ) ) except Exception as e: logger.debug(f"Failed to resolve handle {handle}: {e}") continue # Parse URLs url_regex = rb"[$|\W](https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)" for m in re.finditer(url_regex, text_bytes): url = m.group(1).decode("UTF-8") facets.append( models.AppBskyRichtextFacet.Main( index=models.AppBskyRichtextFacet.ByteSlice( byteStart=m.start(1), byteEnd=m.end(1) ), features=[models.AppBskyRichtextFacet.Link(uri=url)] ) ) # Send the reply with facets if any were found if facets: response = client.send_post( text=text, reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref), facets=facets ) else: response = client.send_post( text=text, reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref) ) logger.info(f"Reply sent successfully: {response.uri}") return response def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]: """ Get the thread containing a post to find root post information. Args: client: Authenticated Bluesky client uri: The URI of the post Returns: The thread data or None if not found """ try: thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10}) return thread except Exception as e: logger.error(f"Error fetching post thread: {e}") return None def reply_to_notification(client: Client, notification: Any, reply_text: str) -> Optional[Dict[str, Any]]: """ Reply to a notification (mention or reply). Args: client: Authenticated Bluesky client notification: The notification object from list_notifications reply_text: The text to reply with Returns: The response from sending the reply or None if failed """ try: # Get the post URI and CID from the notification (handle both dict and object) if isinstance(notification, dict): post_uri = notification.get('uri') post_cid = notification.get('cid') elif hasattr(notification, 'uri') and hasattr(notification, 'cid'): post_uri = notification.uri post_cid = notification.cid else: post_uri = None post_cid = None if not post_uri or not post_cid: logger.error("Notification doesn't have required uri/cid fields") return None # Get the thread to find the root post thread_data = get_post_thread(client, post_uri) if thread_data and hasattr(thread_data, 'thread'): thread = thread_data.thread # Find root post root_uri = post_uri root_cid = post_cid # If this has a parent, find the root if hasattr(thread, 'parent') and thread.parent: # Keep going up until we find the root current = thread while hasattr(current, 'parent') and current.parent: current = current.parent if hasattr(current, 'post') and hasattr(current.post, 'uri') and hasattr(current.post, 'cid'): root_uri = current.post.uri root_cid = current.post.cid # Reply to the notification return reply_to_post( client=client, text=reply_text, reply_to_uri=post_uri, reply_to_cid=post_cid, root_uri=root_uri, root_cid=root_cid ) else: # If we can't get thread data, just reply directly return reply_to_post( client=client, text=reply_text, reply_to_uri=post_uri, reply_to_cid=post_cid ) except Exception as e: logger.error(f"Error replying to notification: {e}") return None if __name__ == "__main__": client = default_login() # Example usage: Test facet extraction test_uri = "at://did:example/app.bsky.feed.post/example123" thread_data = get_post_thread(client, test_uri) if thread_data: yaml_output = thread_to_yaml_string(thread_data) print("Enhanced YAML output with extracted links:") print(yaml_output) logger.info("Client is ready to use!")
Content is user-generated and unverified.
    enhanced_bluesky_session_handler.py | Claude