|
|
"""
|
|
|
KV Storage integration for OpenManus
|
|
|
Provides interface to Cloudflare KV operations
|
|
|
"""
|
|
|
|
|
|
import json
|
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
|
|
from app.logger import logger
|
|
|
|
|
|
from .client import CloudflareClient, CloudflareError
|
|
|
|
|
|
|
|
|
class KVStorage:
|
|
|
"""Cloudflare KV Storage client"""
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
client: CloudflareClient,
|
|
|
sessions_namespace_id: str,
|
|
|
cache_namespace_id: str,
|
|
|
):
|
|
|
self.client = client
|
|
|
self.sessions_namespace_id = sessions_namespace_id
|
|
|
self.cache_namespace_id = cache_namespace_id
|
|
|
self.base_endpoint = f"accounts/{client.account_id}/storage/kv/namespaces"
|
|
|
|
|
|
def _get_namespace_id(self, namespace_type: str) -> str:
|
|
|
"""Get namespace ID based on type"""
|
|
|
if namespace_type == "cache":
|
|
|
return self.cache_namespace_id
|
|
|
return self.sessions_namespace_id
|
|
|
|
|
|
async def set_value(
|
|
|
self,
|
|
|
key: str,
|
|
|
value: Any,
|
|
|
namespace_type: str = "sessions",
|
|
|
ttl: Optional[int] = None,
|
|
|
use_worker: bool = True,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Set a value in KV store"""
|
|
|
|
|
|
namespace_id = self._get_namespace_id(namespace_type)
|
|
|
|
|
|
|
|
|
if isinstance(value, (dict, list)):
|
|
|
serialized_value = json.dumps(value)
|
|
|
elif isinstance(value, str):
|
|
|
serialized_value = value
|
|
|
else:
|
|
|
serialized_value = json.dumps(value)
|
|
|
|
|
|
try:
|
|
|
if use_worker:
|
|
|
set_data = {
|
|
|
"key": key,
|
|
|
"value": serialized_value,
|
|
|
"namespace": namespace_type,
|
|
|
}
|
|
|
|
|
|
if ttl:
|
|
|
set_data["ttl"] = ttl
|
|
|
|
|
|
response = await self.client.post(
|
|
|
f"api/kv/set", data=set_data, use_worker=True
|
|
|
)
|
|
|
else:
|
|
|
|
|
|
params = {}
|
|
|
if ttl:
|
|
|
params["expiration_ttl"] = ttl
|
|
|
|
|
|
query_string = "&".join([f"{k}={v}" for k, v in params.items()])
|
|
|
endpoint = f"{self.base_endpoint}/{namespace_id}/values/{key}"
|
|
|
if query_string:
|
|
|
endpoint += f"?{query_string}"
|
|
|
|
|
|
response = await self.client.put(
|
|
|
endpoint, data={"value": serialized_value}
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"key": key,
|
|
|
"namespace": namespace_type,
|
|
|
"ttl": ttl,
|
|
|
**response,
|
|
|
}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"KV set value failed: {e}")
|
|
|
raise
|
|
|
|
|
|
async def get_value(
|
|
|
self,
|
|
|
key: str,
|
|
|
namespace_type: str = "sessions",
|
|
|
parse_json: bool = True,
|
|
|
use_worker: bool = True,
|
|
|
) -> Optional[Any]:
|
|
|
"""Get a value from KV store"""
|
|
|
|
|
|
namespace_id = self._get_namespace_id(namespace_type)
|
|
|
|
|
|
try:
|
|
|
if use_worker:
|
|
|
response = await self.client.get(
|
|
|
f"api/kv/get/{key}?namespace={namespace_type}", use_worker=True
|
|
|
)
|
|
|
|
|
|
if response and "value" in response:
|
|
|
value = response["value"]
|
|
|
|
|
|
if parse_json and isinstance(value, str):
|
|
|
try:
|
|
|
return json.loads(value)
|
|
|
except json.JSONDecodeError:
|
|
|
return value
|
|
|
|
|
|
return value
|
|
|
else:
|
|
|
response = await self.client.get(
|
|
|
f"{self.base_endpoint}/{namespace_id}/values/{key}"
|
|
|
)
|
|
|
|
|
|
|
|
|
value = (
|
|
|
response.get("result", {}).get("value")
|
|
|
if "result" in response
|
|
|
else response
|
|
|
)
|
|
|
|
|
|
if value and parse_json and isinstance(value, str):
|
|
|
try:
|
|
|
return json.loads(value)
|
|
|
except json.JSONDecodeError:
|
|
|
return value
|
|
|
|
|
|
return value
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
if e.status_code == 404:
|
|
|
return None
|
|
|
logger.error(f"KV get value failed: {e}")
|
|
|
raise
|
|
|
|
|
|
return None
|
|
|
|
|
|
async def delete_value(
|
|
|
self, key: str, namespace_type: str = "sessions", use_worker: bool = True
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Delete a value from KV store"""
|
|
|
|
|
|
namespace_id = self._get_namespace_id(namespace_type)
|
|
|
|
|
|
try:
|
|
|
if use_worker:
|
|
|
response = await self.client.delete(
|
|
|
f"api/kv/delete/{key}?namespace={namespace_type}", use_worker=True
|
|
|
)
|
|
|
else:
|
|
|
response = await self.client.delete(
|
|
|
f"{self.base_endpoint}/{namespace_id}/values/{key}"
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"success": True,
|
|
|
"key": key,
|
|
|
"namespace": namespace_type,
|
|
|
**response,
|
|
|
}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"KV delete value failed: {e}")
|
|
|
raise
|
|
|
|
|
|
async def list_keys(
|
|
|
self,
|
|
|
namespace_type: str = "sessions",
|
|
|
prefix: str = "",
|
|
|
limit: int = 1000,
|
|
|
use_worker: bool = True,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""List keys in KV namespace"""
|
|
|
|
|
|
namespace_id = self._get_namespace_id(namespace_type)
|
|
|
|
|
|
try:
|
|
|
if use_worker:
|
|
|
params = {"namespace": namespace_type, "prefix": prefix, "limit": limit}
|
|
|
|
|
|
query_string = "&".join([f"{k}={v}" for k, v in params.items() if v])
|
|
|
response = await self.client.get(
|
|
|
f"api/kv/list?{query_string}", use_worker=True
|
|
|
)
|
|
|
else:
|
|
|
params = {"prefix": prefix, "limit": limit}
|
|
|
|
|
|
query_string = "&".join([f"{k}={v}" for k, v in params.items() if v])
|
|
|
response = await self.client.get(
|
|
|
f"{self.base_endpoint}/{namespace_id}/keys?{query_string}"
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"namespace": namespace_type,
|
|
|
"prefix": prefix,
|
|
|
"keys": (
|
|
|
response.get("result", [])
|
|
|
if "result" in response
|
|
|
else response.get("keys", [])
|
|
|
),
|
|
|
**response,
|
|
|
}
|
|
|
|
|
|
except CloudflareError as e:
|
|
|
logger.error(f"KV list keys failed: {e}")
|
|
|
raise
|
|
|
|
|
|
|
|
|
async def set_session(
|
|
|
self,
|
|
|
session_id: str,
|
|
|
session_data: Dict[str, Any],
|
|
|
ttl: int = 86400,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Set session data"""
|
|
|
|
|
|
data = {
|
|
|
**session_data,
|
|
|
"created_at": session_data.get("created_at", int(time.time())),
|
|
|
"expires_at": int(time.time()) + ttl,
|
|
|
}
|
|
|
|
|
|
return await self.set_value(f"session:{session_id}", data, "sessions", ttl)
|
|
|
|
|
|
async def get_session(self, session_id: str) -> Optional[Dict[str, Any]]:
|
|
|
"""Get session data"""
|
|
|
|
|
|
session = await self.get_value(f"session:{session_id}", "sessions")
|
|
|
|
|
|
if session and isinstance(session, dict):
|
|
|
|
|
|
expires_at = session.get("expires_at")
|
|
|
if expires_at and int(time.time()) > expires_at:
|
|
|
await self.delete_session(session_id)
|
|
|
return None
|
|
|
|
|
|
return session
|
|
|
|
|
|
async def delete_session(self, session_id: str) -> Dict[str, Any]:
|
|
|
"""Delete session data"""
|
|
|
|
|
|
return await self.delete_value(f"session:{session_id}", "sessions")
|
|
|
|
|
|
async def update_session(
|
|
|
self, session_id: str, updates: Dict[str, Any], extend_ttl: Optional[int] = None
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Update session data"""
|
|
|
|
|
|
existing_session = await self.get_session(session_id)
|
|
|
|
|
|
if not existing_session:
|
|
|
raise CloudflareError("Session not found")
|
|
|
|
|
|
updated_data = {**existing_session, **updates, "updated_at": int(time.time())}
|
|
|
|
|
|
|
|
|
ttl = None
|
|
|
if extend_ttl:
|
|
|
ttl = extend_ttl
|
|
|
elif existing_session.get("expires_at"):
|
|
|
ttl = max(0, existing_session["expires_at"] - int(time.time()))
|
|
|
|
|
|
return await self.set_session(session_id, updated_data, ttl or 86400)
|
|
|
|
|
|
|
|
|
async def set_cache(
|
|
|
self, key: str, data: Any, ttl: int = 3600
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Set cache data"""
|
|
|
|
|
|
cache_data = {
|
|
|
"data": data,
|
|
|
"cached_at": int(time.time()),
|
|
|
"expires_at": int(time.time()) + ttl,
|
|
|
}
|
|
|
|
|
|
return await self.set_value(f"cache:{key}", cache_data, "cache", ttl)
|
|
|
|
|
|
async def get_cache(self, key: str) -> Optional[Any]:
|
|
|
"""Get cache data"""
|
|
|
|
|
|
cached = await self.get_value(f"cache:{key}", "cache")
|
|
|
|
|
|
if cached and isinstance(cached, dict):
|
|
|
|
|
|
expires_at = cached.get("expires_at")
|
|
|
if expires_at and int(time.time()) > expires_at:
|
|
|
await self.delete_cache(key)
|
|
|
return None
|
|
|
|
|
|
return cached.get("data")
|
|
|
|
|
|
return cached
|
|
|
|
|
|
async def delete_cache(self, key: str) -> Dict[str, Any]:
|
|
|
"""Delete cache data"""
|
|
|
|
|
|
return await self.delete_value(f"cache:{key}", "cache")
|
|
|
|
|
|
|
|
|
async def set_user_cache(
|
|
|
self, user_id: str, key: str, data: Any, ttl: int = 3600
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Set user-specific cache"""
|
|
|
|
|
|
user_key = f"user:{user_id}:{key}"
|
|
|
return await self.set_cache(user_key, data, ttl)
|
|
|
|
|
|
async def get_user_cache(self, user_id: str, key: str) -> Optional[Any]:
|
|
|
"""Get user-specific cache"""
|
|
|
|
|
|
user_key = f"user:{user_id}:{key}"
|
|
|
return await self.get_cache(user_key)
|
|
|
|
|
|
async def delete_user_cache(self, user_id: str, key: str) -> Dict[str, Any]:
|
|
|
"""Delete user-specific cache"""
|
|
|
|
|
|
user_key = f"user:{user_id}:{key}"
|
|
|
return await self.delete_cache(user_key)
|
|
|
|
|
|
async def get_user_cache_keys(self, user_id: str, limit: int = 100) -> List[str]:
|
|
|
"""Get all cache keys for a user"""
|
|
|
|
|
|
result = await self.list_keys("cache", f"cache:user:{user_id}:", limit)
|
|
|
|
|
|
keys = []
|
|
|
for key_info in result.get("keys", []):
|
|
|
if isinstance(key_info, dict):
|
|
|
key = key_info.get("name", "")
|
|
|
else:
|
|
|
key = str(key_info)
|
|
|
|
|
|
|
|
|
if key.startswith(f"cache:user:{user_id}:"):
|
|
|
clean_key = key.replace(f"cache:user:{user_id}:", "")
|
|
|
keys.append(clean_key)
|
|
|
|
|
|
return keys
|
|
|
|
|
|
|
|
|
async def cache_conversation(
|
|
|
self,
|
|
|
conversation_id: str,
|
|
|
messages: List[Dict[str, Any]],
|
|
|
ttl: int = 7200,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Cache conversation messages"""
|
|
|
|
|
|
return await self.set_cache(
|
|
|
f"conversation:{conversation_id}",
|
|
|
{"messages": messages, "last_updated": int(time.time())},
|
|
|
ttl,
|
|
|
)
|
|
|
|
|
|
async def get_cached_conversation(
|
|
|
self, conversation_id: str
|
|
|
) -> Optional[Dict[str, Any]]:
|
|
|
"""Get cached conversation"""
|
|
|
|
|
|
return await self.get_cache(f"conversation:{conversation_id}")
|
|
|
|
|
|
|
|
|
async def cache_agent_execution(
|
|
|
self, execution_id: str, execution_data: Dict[str, Any], ttl: int = 3600
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Cache agent execution data"""
|
|
|
|
|
|
return await self.set_cache(f"execution:{execution_id}", execution_data, ttl)
|
|
|
|
|
|
async def get_cached_agent_execution(
|
|
|
self, execution_id: str
|
|
|
) -> Optional[Dict[str, Any]]:
|
|
|
"""Get cached agent execution"""
|
|
|
|
|
|
return await self.get_cache(f"execution:{execution_id}")
|
|
|
|
|
|
|
|
|
async def set_batch(
|
|
|
self,
|
|
|
items: List[Dict[str, Any]],
|
|
|
namespace_type: str = "cache",
|
|
|
ttl: Optional[int] = None,
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Set multiple values (simulated batch operation)"""
|
|
|
|
|
|
results = []
|
|
|
successful = 0
|
|
|
failed = 0
|
|
|
|
|
|
for item in items:
|
|
|
try:
|
|
|
key = item["key"]
|
|
|
value = item["value"]
|
|
|
item_ttl = item.get("ttl", ttl)
|
|
|
|
|
|
result = await self.set_value(key, value, namespace_type, item_ttl)
|
|
|
results.append({"key": key, "success": True, "result": result})
|
|
|
successful += 1
|
|
|
|
|
|
except Exception as e:
|
|
|
results.append(
|
|
|
{"key": item.get("key"), "success": False, "error": str(e)}
|
|
|
)
|
|
|
failed += 1
|
|
|
|
|
|
return {
|
|
|
"success": failed == 0,
|
|
|
"successful": successful,
|
|
|
"failed": failed,
|
|
|
"total": len(items),
|
|
|
"results": results,
|
|
|
}
|
|
|
|
|
|
async def get_batch(
|
|
|
self, keys: List[str], namespace_type: str = "cache"
|
|
|
) -> Dict[str, Any]:
|
|
|
"""Get multiple values (simulated batch operation)"""
|
|
|
|
|
|
results = {}
|
|
|
|
|
|
for key in keys:
|
|
|
try:
|
|
|
value = await self.get_value(key, namespace_type)
|
|
|
results[key] = value
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to get key {key}: {e}")
|
|
|
results[key] = None
|
|
|
|
|
|
return results
|
|
|
|
|
|
def _hash_params(self, params: Dict[str, Any]) -> str:
|
|
|
"""Create a hash for cache keys from parameters"""
|
|
|
|
|
|
if not params:
|
|
|
return "no-params"
|
|
|
|
|
|
|
|
|
import hashlib
|
|
|
|
|
|
params_str = json.dumps(params, sort_keys=True)
|
|
|
return hashlib.md5(params_str.encode()).hexdigest()[:16]
|
|
|
|
|
|
|
|
|
|
|
|
import time
|
|
|
|