import os
import logging
from typing import Optional, Dict, Any, List
from atproto_client import Client, Session, SessionEvent, models
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("bluesky_session_handler")
# Load the environment variables
import dotenv
dotenv.load_dotenv(override=True)
import yaml
import json
import re
# Strip fields. A list of fields to remove from a JSON object
# Note: 'facets' removed from this list so we can process links
STRIP_FIELDS = [
"cid",
"rev",
"did",
"uri",
"langs",
"threadgate",
"py_type",
"labels",
"avatar",
"viewer",
"indexed_at",
"tags",
"associated",
"thread_context",
"aspect_ratio",
"thumb",
"fullsize",
"root",
"created_at",
"verification",
"like_count",
"quote_count",
"reply_count",
"repost_count",
"embedding_disabled",
"thread_muted",
"reply_disabled",
"pinned",
"like",
"repost",
"blocked_by",
"blocking",
"blocking_by_list",
"followed_by",
"following",
"known_followers",
"muted",
"muted_by_list",
"root_author_like",
"entities",
"ref",
"mime_type",
"size",
]
def extract_links_from_facets(text: str, facets: List[Dict]) -> List[Dict[str, Any]]:
"""
Extract link information from Bluesky facets.
Args:
text: The post text
facets: List of facets from the post
Returns:
List of dictionaries containing link information
"""
links = []
text_bytes = text.encode('utf-8')
for facet in facets:
if not isinstance(facet, dict):
continue
features = facet.get('features', [])
index = facet.get('index', {})
for feature in features:
if not isinstance(feature, dict):
continue
# Check if this is a link facet
if feature.get('$type') == 'app.bsky.richtext.facet#link':
try:
start = index.get('byteStart', 0)
end = index.get('byteEnd', 0)
# Extract the display text
display_text = text_bytes[start:end].decode('utf-8')
actual_url = feature.get('uri', '')
links.append({
'display_text': display_text,
'actual_url': actual_url,
'byte_start': start,
'byte_end': end
})
except (UnicodeDecodeError, KeyError) as e:
logger.warning(f"Failed to extract link from facet: {e}")
continue
return links
def extract_mentions_from_facets(text: str, facets: List[Dict]) -> List[Dict[str, Any]]:
"""
Extract mention information from Bluesky facets.
Args:
text: The post text
facets: List of facets from the post
Returns:
List of dictionaries containing mention information
"""
mentions = []
text_bytes = text.encode('utf-8')
for facet in facets:
if not isinstance(facet, dict):
continue
features = facet.get('features', [])
index = facet.get('index', {})
for feature in features:
if not isinstance(feature, dict):
continue
# Check if this is a mention facet
if feature.get('$type') == 'app.bsky.richtext.facet#mention':
try:
start = index.get('byteStart', 0)
end = index.get('byteEnd', 0)
# Extract the display text (should include @)
display_text = text_bytes[start:end].decode('utf-8')
did = feature.get('did', '')
mentions.append({
'display_text': display_text,
'did': did,
'byte_start': start,
'byte_end': end
})
except (UnicodeDecodeError, KeyError) as e:
logger.warning(f"Failed to extract mention from facet: {e}")
continue
return mentions
def process_post_facets(post_data: Dict) -> Dict[str, Any]:
"""
Process facets in a post and extract link/mention information.
Args:
post_data: Post data dictionary
Returns:
Dictionary with extracted facet information
"""
text = post_data.get('text', '')
facets = post_data.get('facets', [])
if not facets:
return {}
links = extract_links_from_facets(text, facets)
mentions = extract_mentions_from_facets(text, facets)
facet_info = {}
if links:
facet_info['extracted_links'] = links
if mentions:
facet_info['extracted_mentions'] = mentions
return facet_info
def convert_to_basic_types(obj):
"""Convert complex Python objects to basic types for JSON/YAML serialization."""
if hasattr(obj, '__dict__'):
# Convert objects with __dict__ to their dictionary representation
return convert_to_basic_types(obj.__dict__)
elif isinstance(obj, dict):
return {key: convert_to_basic_types(value) for key, value in obj.items()}
elif isinstance(obj, list):
return [convert_to_basic_types(item) for item in obj]
elif isinstance(obj, (str, int, float, bool)) or obj is None:
return obj
else:
# For other types, try to convert to string
return str(obj)
def strip_fields(obj, strip_field_list):
"""Recursively strip fields from a JSON object."""
if isinstance(obj, dict):
keys_flagged_for_removal = []
# Remove fields from strip list and pydantic metadata
for field in list(obj.keys()):
if field in strip_field_list or field.startswith("__"):
keys_flagged_for_removal.append(field)
# Remove flagged keys
for key in keys_flagged_for_removal:
obj.pop(key, None)
# Recursively process remaining values
for key, value in list(obj.items()):
obj[key] = strip_fields(value, strip_field_list)
# Remove empty/null values after processing
if (
obj[key] is None
or (isinstance(obj[key], dict) and len(obj[key]) == 0)
or (isinstance(obj[key], list) and len(obj[key]) == 0)
or (isinstance(obj[key], str) and obj[key].strip() == "")
):
obj.pop(key, None)
elif isinstance(obj, list):
for i, value in enumerate(obj):
obj[i] = strip_fields(value, strip_field_list)
# Remove None values from list
obj[:] = [item for item in obj if item is not None]
return obj
def enhance_post_with_facets(post_data: Dict) -> Dict:
"""
Enhance post data by extracting facet information and adding it as readable fields.
Args:
post_data: The post data dictionary
Returns:
Enhanced post data with facet information
"""
if not isinstance(post_data, dict):
return post_data
# Extract facet information before stripping
facet_info = process_post_facets(post_data)
# Add the extracted facet information to the post
if facet_info:
post_data.update(facet_info)
# Now remove the original facets field since we've extracted the useful info
if 'facets' in post_data:
del post_data['facets']
return post_data
def thread_to_yaml_string(thread, strip_metadata=True):
"""
Convert thread data to a YAML-formatted string for LLM parsing.
Enhanced to preserve link information from facets.
Args:
thread: The thread data from get_post_thread
strip_metadata: Whether to strip metadata fields for cleaner output
Returns:
YAML-formatted string representation of the thread
"""
# First convert complex objects to basic types
basic_thread = convert_to_basic_types(thread)
# Process facets before stripping metadata
def process_item(item):
if isinstance(item, dict):
# Check if this looks like a post with facets
if 'text' in item and 'facets' in item:
item = enhance_post_with_facets(item)
# Recursively process nested items
for key, value in item.items():
item[key] = process_item(value)
elif isinstance(item, list):
item = [process_item(sub_item) for sub_item in item]
return item
enhanced_thread = process_item(basic_thread)
if strip_metadata:
# Create a copy and strip unwanted fields
cleaned_thread = strip_fields(enhanced_thread, STRIP_FIELDS)
else:
cleaned_thread = enhanced_thread
return yaml.dump(cleaned_thread, indent=2, allow_unicode=True, default_flow_style=False)
def get_session(username: str) -> Optional[str]:
try:
with open(f"session_{username}.txt", encoding="UTF-8") as f:
return f.read()
except FileNotFoundError:
logger.debug(f"No existing session found for {username}")
return None
def save_session(username: str, session_string: str) -> None:
with open(f"session_{username}.txt", "w", encoding="UTF-8") as f:
f.write(session_string)
logger.debug(f"Session saved for {username}")
def on_session_change(username: str, event: SessionEvent, session: Session) -> None:
logger.info(f"Session changed: {event} {repr(session)}")
if event in (SessionEvent.CREATE, SessionEvent.REFRESH):
logger.info(f"Saving changed session for {username}")
save_session(username, session.export())
def init_client(username: str, password: str) -> Client:
pds_uri = os.getenv("PDS_URI")
if pds_uri is None:
logger.warning(
"No PDS URI provided. Falling back to bsky.social. Note! If you are on a non-Bluesky PDS, this can cause logins to fail. Please provide a PDS URI using the PDS_URI environment variable."
)
pds_uri = "https://bsky.social"
# Print the PDS URI
logger.info(f"Using PDS URI: {pds_uri}")
client = Client(pds_uri)
client.on_session_change(
lambda event, session: on_session_change(username, event, session)
)
session_string = get_session(username)
if session_string:
logger.info(f"Reusing existing session for {username}")
client.login(session_string=session_string)
else:
logger.info(f"Creating new session for {username}")
client.login(username, password)
return client
def default_login() -> Client:
username = os.getenv("BSKY_USERNAME")
password = os.getenv("BSKY_PASSWORD")
if username is None:
logger.error(
"No username provided. Please provide a username using the BSKY_USERNAME environment variable."
)
exit()
if password is None:
logger.error(
"No password provided. Please provide a password using the BSKY_PASSWORD environment variable."
)
exit()
return init_client(username, password)
def reply_to_post(client: Client, text: str, reply_to_uri: str, reply_to_cid: str, root_uri: Optional[str] = None, root_cid: Optional[str] = None) -> Dict[str, Any]:
"""
Reply to a post on Bluesky with rich text support.
Args:
client: Authenticated Bluesky client
text: The reply text
reply_to_uri: The URI of the post being replied to (parent)
reply_to_cid: The CID of the post being replied to (parent)
root_uri: The URI of the root post (if replying to a reply). If None, uses reply_to_uri
root_cid: The CID of the root post (if replying to a reply). If None, uses reply_to_cid
Returns:
The response from sending the post
"""
import re
# If root is not provided, this is a reply to the root post
if root_uri is None:
root_uri = reply_to_uri
root_cid = reply_to_cid
# Create references for the reply
parent_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=reply_to_uri, cid=reply_to_cid))
root_ref = models.create_strong_ref(models.ComAtprotoRepoStrongRef.Main(uri=root_uri, cid=root_cid))
# Parse rich text facets (mentions and URLs)
facets = []
text_bytes = text.encode("UTF-8")
# Parse mentions
mention_regex = rb"[$|\W](@([a-zA-Z0-9]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?\.)+[a-zA-Z]([a-zA-Z0-9-]{0,61}[a-zA-Z0-9])?)"
for m in re.finditer(mention_regex, text_bytes):
handle = m.group(1)[1:].decode("UTF-8") # Remove @ prefix
try:
# Resolve handle to DID using the API
resolve_resp = client.app.bsky.actor.get_profile({'actor': handle})
if resolve_resp and hasattr(resolve_resp, 'did'):
facets.append(
models.AppBskyRichtextFacet.Main(
index=models.AppBskyRichtextFacet.ByteSlice(
byteStart=m.start(1),
byteEnd=m.end(1)
),
features=[models.AppBskyRichtextFacet.Mention(did=resolve_resp.did)]
)
)
except Exception as e:
logger.debug(f"Failed to resolve handle {handle}: {e}")
continue
# Parse URLs
url_regex = rb"[$|\W](https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*[-a-zA-Z0-9@%_\+~#//=])?)"
for m in re.finditer(url_regex, text_bytes):
url = m.group(1).decode("UTF-8")
facets.append(
models.AppBskyRichtextFacet.Main(
index=models.AppBskyRichtextFacet.ByteSlice(
byteStart=m.start(1),
byteEnd=m.end(1)
),
features=[models.AppBskyRichtextFacet.Link(uri=url)]
)
)
# Send the reply with facets if any were found
if facets:
response = client.send_post(
text=text,
reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref),
facets=facets
)
else:
response = client.send_post(
text=text,
reply_to=models.AppBskyFeedPost.ReplyRef(parent=parent_ref, root=root_ref)
)
logger.info(f"Reply sent successfully: {response.uri}")
return response
def get_post_thread(client: Client, uri: str) -> Optional[Dict[str, Any]]:
"""
Get the thread containing a post to find root post information.
Args:
client: Authenticated Bluesky client
uri: The URI of the post
Returns:
The thread data or None if not found
"""
try:
thread = client.app.bsky.feed.get_post_thread({'uri': uri, 'parent_height': 60, 'depth': 10})
return thread
except Exception as e:
logger.error(f"Error fetching post thread: {e}")
return None
def reply_to_notification(client: Client, notification: Any, reply_text: str) -> Optional[Dict[str, Any]]:
"""
Reply to a notification (mention or reply).
Args:
client: Authenticated Bluesky client
notification: The notification object from list_notifications
reply_text: The text to reply with
Returns:
The response from sending the reply or None if failed
"""
try:
# Get the post URI and CID from the notification (handle both dict and object)
if isinstance(notification, dict):
post_uri = notification.get('uri')
post_cid = notification.get('cid')
elif hasattr(notification, 'uri') and hasattr(notification, 'cid'):
post_uri = notification.uri
post_cid = notification.cid
else:
post_uri = None
post_cid = None
if not post_uri or not post_cid:
logger.error("Notification doesn't have required uri/cid fields")
return None
# Get the thread to find the root post
thread_data = get_post_thread(client, post_uri)
if thread_data and hasattr(thread_data, 'thread'):
thread = thread_data.thread
# Find root post
root_uri = post_uri
root_cid = post_cid
# If this has a parent, find the root
if hasattr(thread, 'parent') and thread.parent:
# Keep going up until we find the root
current = thread
while hasattr(current, 'parent') and current.parent:
current = current.parent
if hasattr(current, 'post') and hasattr(current.post, 'uri') and hasattr(current.post, 'cid'):
root_uri = current.post.uri
root_cid = current.post.cid
# Reply to the notification
return reply_to_post(
client=client,
text=reply_text,
reply_to_uri=post_uri,
reply_to_cid=post_cid,
root_uri=root_uri,
root_cid=root_cid
)
else:
# If we can't get thread data, just reply directly
return reply_to_post(
client=client,
text=reply_text,
reply_to_uri=post_uri,
reply_to_cid=post_cid
)
except Exception as e:
logger.error(f"Error replying to notification: {e}")
return None
if __name__ == "__main__":
client = default_login()
# Example usage: Test facet extraction
test_uri = "at://did:example/app.bsky.feed.post/example123"
thread_data = get_post_thread(client, test_uri)
if thread_data:
yaml_output = thread_to_yaml_string(thread_data)
print("Enhanced YAML output with extracted links:")
print(yaml_output)
logger.info("Client is ready to use!")